diff --git a/arrow-docs/docs/fx/async/README.md b/arrow-docs/docs/fx/async/README.md index fa2fb5993..090ac1956 100644 --- a/arrow-docs/docs/fx/async/README.md +++ b/arrow-docs/docs/fx/async/README.md @@ -1,94 +1,59 @@ --- layout: docs-fx -title: Arrow Fx - Asynchronous & Concurrent Programming +title: Arrow Fx Coroutines - Asynchronous & Concurrent Programming permalink: /fx/async/ --- # Asynchronous & Concurrent Programming -Arrow Fx benefits from the `!effect` application and direct syntax for asynchronous programming by yielding extremely succinct programs without callbacks. This allows us to use direct style syntax with asynchronous and concurrent operations while preserving effect control in the types and runtime, and bind their results to the left-hand side. The resulting expressions enjoy the same syntax that most OOP and Java programmers are already accustomed to—direct blocking imperative style. +Arrow Fx benefits from the `suspend` syntax for extremely succinct programs without callbacks. +This allows us to use direct style syntax with asynchronous and concurrent operations while preserving effect control in the types and runtime, and bind their results to the left-hand side. +The resulting expressions enjoy the same syntax that most OOP and Java programmers are already accustomed to—direct blocking imperative style. ## Dispatchers and Contexts Performing effects while switching execution contexts a la carte is trivial. ```kotlin:ank:playground -import arrow.fx.IO -import arrow.unsafe -import arrow.fx.extensions.io.unsafeRun.runBlocking -import arrow.fx.extensions.fx -import kotlinx.coroutines.newSingleThreadContext +import arrow.fx.coroutines.* //sampleStart -val contextA = newSingleThreadContext("A") - suspend fun printThreadName(): Unit = println(Thread.currentThread().name) -val program = IO.fx { - continueOn(contextA) - !effect { printThreadName() } - continueOn(dispatchers().default()) - !effect { printThreadName() } -} -//sampleEnd -fun main() { // The edge of our world - unsafe { runBlocking { program } } -} -``` - -In addition to `continueOn`, Arrow Fx allows users to override the executions context in all functions that require one. - -## Fibers - -A [Fiber]({{'/effects/fiber' | relative_url }}) represents the pure result of a [Concurrent] data type starting concurrently that can be either `join`ed or `cancel`ed. - -```kotlin:ank:playground -import arrow.fx.IO -import arrow.unsafe -import arrow.fx.extensions.io.unsafeRun.runBlocking -import arrow.fx.extensions.fx - -//sampleStart -suspend fun threadName(): String = - Thread.currentThread().name - -val program = IO.fx { - val fiberA = !effect { threadName() }.fork(dispatchers().default()) - val fiberB = !effect { threadName() }.fork(dispatchers().default()) - val threadA = !fiberA.join() - val threadB = !fiberB.join() - !effect { println(threadA) } - !effect { println(threadB) } +suspend fun main(): Unit { + evalOn(ComputationPool) { + printThreadName() + } + evalOn(IOPool) { + printThreadName() + } } //sampleEnd -fun main() { // The edge of our world - unsafe { runBlocking { program } } -} ``` -When we spawn fibers, we can obtain their deferred non-blocking result using `join()` and destructuring the effect. +In addition to `evalOn`, Arrow Fx allows users to override the executions context in all functions that require one. -`dispatchers().default()` is an execution context that's available to all concurrent data types, such as IO, that you can use directly on `fx` blocks. +## Parallelization & Concurrency -Note that, because we are using `Fiber` and a Dispatcher that may not create new threads in all cases here, there is no guarantee that the printed thread names will be different. +Arrow Fx comes with built-in versions of `parMapN`, `parTraverse`, and `parSequence` and many more allowing users to dispatch effects in parallel and receive non-blocking results and direct syntax without wrappers. +All parallel suspend operators in Arrow Fx behave in the following way. -This is part of the greatness of Fibers. They run as scheduled, based on the policies provided by the Dispatcher's Context. + - When one of the parallel task fails, the others are also cancelled since a result cannot be determined. This will allow the other parallel operations to gracefully exit and close their resources before returning. -## Parallelization & Concurrency + - When the resulting suspend operation is cancelled than all running fibers inside will also be cancelled so that all paralell running task can gracefully exit and close their resources before returning. -Arrow Fx comes with built-in versions of `parMapN`, `parTraverse`, and `parSequence`, allowing users to dispatch effects in parallel and receive non-blocking results and direct syntax without wrappers. +For more documentation on parallel operations see below. -### `parMapN` +### `parMapN`/`parTupledN` -`parMapN` allows *N#* effects to run in parallel non-blocking waiting for all results to complete, and then delegates to a user-provided function that applies a final transformation over the results. -Once the function specifies a valid return, we can observe how the returned non-blocking value is bound on the left-hand side. +`parMapN` allows *N#* effects to run in parallel on a given `CoroutineContext` suspending until all results completed, and then apply the user-provided transformation over the results. +All input suspend functions are guaranteed to dispatch on the given CoroutineContext before they start running. +It also wires their respective cancellation. That means that cancelling the resulting suspend fun will cancel both functions running in parallel inside. +Additionally, the function does not return until both tasks are finished and their results combined by f: (A, B) -> C. ```kotlin:ank:playground -import arrow.fx.IO -import arrow.unsafe -import arrow.fx.extensions.io.unsafeRun.runBlocking -import arrow.fx.extensions.fx +import arrow.fx.coroutines.* //sampleStart suspend fun threadName(): String = @@ -99,163 +64,282 @@ data class ThreadInfo( val threadB: String ) -val program = IO.fx { +suspend fun main(): Unit { val (threadA: String, threadB: String) = - !IO.parMapN( - dispatchers().default(), - effect { threadName() }, - effect { threadName() }, - ::ThreadInfo - ) - !effect { println(threadA) } - !effect { println(threadB) } + parMapN(::threadName, ::threadName) { (a, b) -> + ThreadInfo(a, b) + } + + println(threadA) + println(threadB) } //sampleEnd -fun main() { // The edge of our world - unsafe { runBlocking { program } } -} ``` ### `parTraverse` -`parTraverse` allows any `Iterable A>` to iterate over its contained effects in parallel as we apply a user-provided function over each effect result, and then gather all the transformed results in a `List`. +`parTraverse` allows to map elements of the same type `A` in parallel for a given `Iterable`, and then gather all the transformed results in a `List`. +Cancelling the caller will cancel all running operations inside parTraverse gracefully. ```kotlin:ank:playground -import arrow.fx.IO -import arrow.unsafe -import arrow.fx.extensions.io.unsafeRun.runBlocking -import arrow.fx.extensions.fx +import arrow.fx.coroutines.* //sampleStart suspend fun threadName(i: Int): String = "$i on ${Thread.currentThread().name}" -val program = IO.fx { - val result: List = ! - listOf(1, 2, 3).parTraverse { i -> - effect { threadName(i) } - } - !effect { println(result) } +suspend fun main(): Unit { + val result: List = + listOf(1, 2, 3).parTraverse(::threadName) + + println(result) } //sampleEnd -fun main() { // The edge of our world - unsafe { runBlocking { program } } +``` + +### `raceN` + +`raceN` allows *N#* effects to race in parallel and non-blocking waiting for the first results to complete, and then cancel all remaining racers. +Once the function specifies a valid return, we can observe how the returned non-blocking value is bound on the left-hand side. + +```kotlin:ank:playground +import arrow.fx.coroutines.* + +//sampleStart +suspend fun loser(): Unit = + never() // Never wins + +suspend fun winner(): Int { + sleep(5.milliseconds) + return 5 +} + +suspend fun main(): Unit { + val res = raceN({ loser() }, { winner() }) + + println(res) } +//sampleEnd ``` -### `parSequence` +## Fibers -`parSequence` applies all effects in `Iterable A>` in non-blocking in parallel, then gathers all the transformed results and returns them in a `List`. +A [Fiber]({{'/effects/fiber' | relative_url }}) represents the pure result of an operation starting concurrently that can be either `join`ed or `cancel`ed. +All the operators above can also be build using `Fiber`s, be aware that this is considered more low level. +You should *always* prefer out-of-the-box operators, unless you want to launch concurrent processes explicitly. ```kotlin:ank:playground -import arrow.fx.IO -import arrow.unsafe -import arrow.fx.extensions.io.unsafeRun.runBlocking -import arrow.fx.extensions.fx +import arrow.fx.coroutines.* //sampleStart suspend fun threadName(): String = Thread.currentThread().name -val program = IO.fx { - val result: List = !listOf( - effect { threadName() }, - effect { threadName() }, - effect { threadName() } - ).parSequence() +val ctx = ComputationPool - !effect { println(result) } +suspend fun main(): Unit { + val fiberA = ForkConnected(ctx) { threadName() } + val fiberB = ForkConnected(ctx) { threadName() } + val threadA = fiberA.join() + val threadB = fiberB.join() + println(threadA) + println(threadB) } //sampleEnd -fun main() { // The edge of our world - unsafe { runBlocking { program } } +``` + +When we spawn fibers, we can obtain their deferred non-blocking result using `join()`. + +Note that, because we are using `Fiber` and a `ComputationPool` `CoroutineContext` that may not create new threads in all cases here, there is no guarantee that the printed thread names will be different. +This is part of the greatness of Fibers. They run as scheduled, based on the policies provided by the (dispatching) `CoroutineContext`. + +## Cancellation + +The cancellation system exists out of a few simple building blocks. + +All operators found in Arrow Fx check for cancellation. In the small example of an infinite sleeping loop below `sleep` checks for cancellation and thus this function also check for cancellation before/and while sleeping. + +```kotlin:ank +tailrec suspend fun sleeper(): Unit { + println("I am sleepy. I'm going to nap") + sleep(1.seconds) // <-- cancellation check-point + println("1 second nap.. Going to sleep some more") + sleeper() } ``` -## Arrow Fx vs KotlinX Coroutines +#### cancelBoundary() -Arrow Fx can be seen as a companion to the KotlinX Coroutines library in the same way that Arrow serves as a companion to the Kotlin standard library in providing the abstractions and runtime to implement Typed FP in Kotlin. +Calling `suspend fun cancelBoundary()` will check for cancellation, and will gracefully exit in case the effect was cancelled. An example. -Arrow Fx adds an extra layer of security and effect control where we can easily model side effects and how they interact with pure computations. +``` +suspend fun loop(): Unit { + while(true) { + cancelBoundary() // cancellable computation loop + println("I am getting dizzy...") + } +} +``` + +This `while` will `loop` until the cancel signal is triggered. Once the cancellation is trigger, this task will gracefully exit through `cancelBoundary()`. + +In case you don't want to check for cancellation so often, you can also only install a `cancelBoundary` every n batches. +The example below defines `repeat` which checks cancellation every `10` repetition. + +```kotlin:ank +tailrec suspend fun repeat(n: Int): Unit { + if (n % 10 == 0) cancelBoundary() + if (n == 0) Unit + else repeat(n - 1) +} +``` + +#### Uncancellable -In contrast with the coroutines library, where `Deferred` computations are eager by default and fire immediately when instantiated, in Arrow Fx, all bindings and compositions are lazy and suspended, ensuring execution is explicit and always deferred until the last second. +So how can you execute of `suspend fun` with guarantee that it cannot be cancelled. You simply `wrap` it in the `uncancelable` builder and the function will guarantee not to be cancelled. If the progam is already cancelled before, this block will not run and if it gets cancelled during the execution of this block it will exit immediately after. -Deferring execution and being able to suspend side effects is essential for programs built with Arrow because we can ensure that effects run in a controlled environment and preserve the properties of purity and referential transparency, allowing us to apply equational reasoning over the different parts that conform our programs. +```kotlin:ank +suspend fun uncancellableSleep(duration: Duration): Unit = + uncancellable { sleep(duration) } +``` + +If we now re-implement our previous `sleeper`, than it will behave a little different from before. The cancellation check before and after `uncancellableSleep` but note that the `sleep` istelf will not be cancelled. -Since Arrow Fx uses this lazy behavior by default, we don't have to resort to special configuration arguments when creating deferred computations. +```kotlin:ank +tailrec suspend fun sleeper(): Unit { + println("I am sleepy. I'm going to nap") + // <-- cancellation check-point + uncancellableSleep(1.seconds) + // <-- cancellation check-point + println("1 second nap.. Going to sleep some more") + sleeper() +} +``` -The value `program` below is pure and referentially transparent because `fx` returns a lazy computation: +This also means that our new sleep can back-pressure `timeOutOrNull`. ```kotlin:ank:playground -import arrow.fx.IO -import arrow.unsafe -import arrow.fx.extensions.io.unsafeRun.runBlocking -import arrow.fx.extensions.fx -//sampleStart -suspend fun printThreadName(): Unit = - println(Thread.currentThread().name) +import arrow.fx.coroutines.* -val program = IO.fx { - !effect { printThreadName() } +suspend fun main(): Unit { + val r = timeOutOrNull(1.seconds) { + uncancellable { sleep(2.seconds) } + } // r is null, but took 2 seconds. } +``` + +## Resource Safety -fun main() { // The edge of our world - unsafe { runBlocking { program } } +To ensure resource safety we need to take care with cancellation since we don't wont our process to be cancelled but our resources to remain open. + +There Arrow Fx offers 2 tools `Resource` and `suspend fun bracketCase`. Any `resource` operations exists out of 3 steps. + +1. Acquiring the resource +2. Using the resource +3. Releasing the resource with either a result, a `Throwable` or `Cancellation`. + +To ensure the resource can be correctly acquired we make the `acquire` & `release` step `uncancelable`. +If the `bracketCase` was cancelled during `acquire` it'll immediately go to `release`, skipping the `use` step. + +`bracketCase` is defined below, in the `release` step you can inspect the `ExitCase` of the `acquire`/`use`. + +``` +sealed ExitCase { + object Completed: ExitCase() + object Cancelled: ExitCase() + data class Error(val error: Throwable): ExitCase() } -//sampleEnd + +suspend fun bracketCase(acquire: suspend () -> A, use: suspend (A) -> B, release: (a, ExitCase) -> B): B +``` + +`bracket` is an overload of `bracketCase` that ignores the `ExitCase` value, a simple example. +We want to create a function to safely create and consume a `DatabaseConnection` that always needs to be closed no matter what the _ExitCase_. + +```kotlin:ank +class DatabaseConnection { + suspend fun open(): Unit = println("Database connection opened") + suspend fun close(): Unit = println("Database connection closed") +} + +suspend fun onDbConnection(f: suspend (DatabaseConnection) -> A): A = + bracket( + acquire = { DatabaseConnection().apply { open() } }, + use = f, + release = DatabaseConnection::close + ) ``` -Using the same with the default `async` constructor from the coroutines library will yield an impure function because effects are not controlled and they fire immediately upon function invocation: +The difference between `Resource` is that `bracketCase` is simple function, while `Resource` is a data type, both ensure that resources are `acquire`d and `release`d correctly. +It also forms a `Monad` so you can use it to safely compose `Resource`s, map them or safely traverse `Resource`s. ```kotlin:ank:playground -import kotlinx.coroutines.* -import kotlin.system.* +import arrow.fx.coroutines.* -//sampleStart -suspend fun printThreadName(): Unit = - println(Thread.currentThread().name) +class DatabaseConnection { + suspend fun open(): Unit = println("Database connection opened") + suspend fun close(): Unit = println("Database connection closed") + suspend fun query(id: String): String = + id.toUpperCase() +} -suspend fun program() = - GlobalScope.async { printThreadName() } +val conn: Resource = + Resource( + { DatabaseConnection().apply { open() } }, + DatabaseConnection::close + ) -fun main() { - runBlocking { program().await() } +suspend fun main(): Unit { + val res = conn.use { db -> + db.query("hello, world!") + } + + println(res) } -//sampleEnd ``` -In the previous program, `printThreadName()` may be invoked before we call `await`. -If we want a pure lazy version of this operation, we need to hint to the `async` constructor that our policy is not to start right away. +## Arrow Fx Coroutines, KotlinX Coroutines & Kotlin Standard Library + +### Demystify Coroutine + +Kotlin's standard library defines a `Coroutine` as an instance of a suspendable computation. + +In other words, a `Coroutine` is a compiled `suspend () -> A` program wired to a `Continuation`. + +Which can be created by using [`kotlin.coroutines.intrinsics.createCoroutineUnintercepted`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.intrinsics/create-coroutine-unintercepted.html). + +So let's take a quick look at an example. ```kotlin:ank:playground -import kotlinx.coroutines.* -import kotlin.system.* +import kotlin.coroutines.Continuation +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.intrinsics.createCoroutineUnintercepted +import kotlin.coroutines.resume -//sampleStart -suspend fun printThreadName(): Unit = - println(Thread.currentThread().name) +suspend fun one(): Int = 1 -suspend fun program() = - GlobalScope.async(start = CoroutineStart.LAZY) { printThreadName() } +val cont: Continuation = ::one + .createCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { println(it) }) fun main() { - runBlocking { program().await() } + cont.resume(Unit) } -//sampleEnd ``` -If an `async` computation fires immediately, it does not give us a chance to suspend side effects. This implies that all functions that immediately produce their effects when invoked are impure and non-referentially transparent. This is the default in the KotlinX Coroutines Library. +As you can see here above we create a `Coroutine` using `createCoroutineUnintercepted` which returns us `Continuation`. +Strange, you might've expected a `Coroutine` type but a `Coroutine` is represented by `Continuation`. -Arrow Fx is not opinionated as to whether eagerly firing is a more or less appropriate technique. We, the authors, understand this style gathers a different audience where purity and referential transparency may not be goals or optimization techniques are in play, and that's just fine. +This `typealias Coroutine = Contination` will start running every time you call `resume(Unit)`, which allows you to run the suspend program N times. -Life goes on. +### Arrow Fx Coroutines & KotlinX Coroutines -Arrow Fx offers, in contrast, a different approach that is in line with Arrow's primary concern——helping you, as a user, create well-typed, safe, and pure programs in Kotlin. +Both Arrow Fx Coroutines & KotlinX Coroutines independently offer an implementation for Kotlin's coroutine system. -On top of complementing the KotlinX Coroutines API, Arrow Fx provides interoperability with its runtime, allowing you to run polymorphic programs over the KotlinX Coroutines, Rx2, Reactor, and even custom runtimes. +As explained in the document above, Arrow Fx Coroutines offers a battery-included functional IO with cancellation support. +Where KotlinX Coroutines offers an implementation that offers light-weight futures with cancellation support. ## Integrating with third-party libraries -Arrow Fx integrates with the Arrow Effects IO runtime, Rx2, Reactor framework, and any library that models effectful async/concurrent computations and can provide a `@extension` to the `ConcurrentEffect` type class defined in the `arrow-effects` module out of the box. +Arrow Fx integrates with the Arrow Fx IO runtime, Rx2, Reactor framework, and any library that can model effectful async/concurrent computations as `suspend`. If you are interested in providing your own runtime as a backend to the Arrow Fx library, please contact us in the main [Arrow Gitter](https://gitter.im/arrow-kt/Lobby) or #Arrow channel on the official [Kotlin Lang Slack](https://kotlinlang.slack.com/messages/C5UPMM0A0) with any questions and we'll help you along the way. diff --git a/arrow-fx-coroutines/README.MD b/arrow-fx-coroutines/README.MD index 9e3b722ab..4d6c3d341 100644 --- a/arrow-fx-coroutines/README.MD +++ b/arrow-fx-coroutines/README.MD @@ -208,9 +208,11 @@ tailrec suspend fun sleeper(): Unit { This also means that our new sleep can back-pressure `timeOutOrNull`. ```kotlin:ank -suspend main(): Unit { +import arrow.fx.coroutines.* + +suspend fun main(): Unit { val r = timeOutOrNull(1.seconds) { - uncancelable { sleep(2.seconds) } + uncancellable { sleep(2.seconds) } } // r is null, but took 2 seconds. } ``` @@ -306,7 +308,7 @@ val resources: List> = val resource: Resource> = resources.sequence(Resource.applicative()) -suspend main(): Unit { +suspend fun main(): Unit { resource.use { files -> files.parTraverse(IODispatchers.IOPool) { file -> file.toString() @@ -399,7 +401,7 @@ Simple constructs like `suspend fun Either.catch(f: () -> A): Either(n)) { println("Hello") } @@ -409,7 +411,7 @@ suspend main(): Unit { Alternatively we can re-use this `Schedule` to `retry` a `suspend fun` `n` times when it fails. ```kotlin:ank -suspend main(): Unit { +suspend fun main(): Unit { retry(Schedule.recurs(n)) { println("I am going to do nothing but throw a tantrum!") throw RuntimeException("Boom!") @@ -424,8 +426,8 @@ fun schedule(): Schedule> = Schedule { (recurs(10) and spaced(10.seconds)) zipRight collect() } -suspend main(): Unit { - var count = Atomic(0) +suspend fun main(): Unit { + val count = Atomic(0) val history: List = repeat(schedule()) { println("Incrementing the ref") diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Bracket.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Bracket.kt index 8168af32d..94898cf03 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Bracket.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Bracket.kt @@ -21,6 +21,7 @@ sealed class ExitCase { /** * Runs [f] in an uncancellable manner. + * If [f] gets cancelled, it will back-pressure the cancelling operation until finished. * * ```kotlin:ank:playground * import arrow.fx.coroutines.* @@ -56,6 +57,17 @@ suspend fun uncancellable(f: suspend () -> A): A = } } +/** + * Registers an [onCancel] handler after [fa]. + * [onCancel] is guaranteed to be called in case of cancellation, otherwise it's ignored. + * + * Useful for wiring cancellation tokens between fibers, building inter-op with other effect systems or testing. + * + * @param fa program that you want to register handler on + * @param onCancel handler to run when [fa] gets cancelled. + * @see guarantee for registering a handler that is guaranteed to always run. + * @see guaranteeCase for registering a handler that executes for any [ExitCase]. + */ suspend fun onCancel( fa: suspend () -> A, onCancel: suspend () -> Unit @@ -66,22 +78,149 @@ suspend fun onCancel( } } +/** + * Guarantees execution of a given [finalizer] after [fa] regardless of success, error or cancellation. + * + * As best practice, it's not a good idea to release resources via [guarantee]. + * since [guarantee] doesn't properly model acquiring, using and releasing resources. + * It only models scheduling of a finalizer after a given suspending program, + * so you should prefer [Resource] or [bracket] which captures acquiring, + * using and releasing into 3 separate steps to ensure resource safety. + * + * @param fa program that you want to register handler on + * @param finalizer handler to run after [fa]. + * @see guaranteeCase for registering a handler that tracks the [ExitCase] of [fa]. + */ suspend fun guarantee( fa: suspend () -> A, - release: suspend () -> Unit -): A = guaranteeCase(fa) { release.invoke() } + finalizer: suspend () -> Unit +): A = guaranteeCase(fa) { finalizer.invoke() } +/** + * Guarantees execution of a given [finalizer] after [fa] regardless of success, error or cancellation., allowing + * for differentiating between exit conditions with to the [ExitCase] argument of the finalizer. + * + * As best practice, it's not a good idea to release resources via [guaranteeCase]. + * since [guaranteeCase] doesn't properly model acquiring, using and releasing resources. + * It only models scheduling of a finalizer after a given suspending program, + * so you should prefer [Resource] or [bracketCase] which captures acquiring, + * using and releasing into 3 separate steps to ensure resource safety. + * + * @param fa program that you want to register handler on + * @param finalizer handler to run after [fa]. + * @see guarantee for registering a handler that ignores the [ExitCase] of [fa]. + */ suspend fun guaranteeCase( fa: suspend () -> A, - release: suspend (ExitCase) -> Unit -): A = bracketCase({ Unit }, { fa.invoke() }, { _, ex -> release(ex) }) + finalizer: suspend (ExitCase) -> Unit +): A = bracketCase({ Unit }, { fa.invoke() }, { _, ex -> finalizer(ex) }) +/** + * Meant for specifying tasks with safe resource acquisition and release in the face of errors and interruption. + * It would be the equivalent of an async capable `try/catch/finally` statements in mainstream imperative languages for resource + * acquisition and release. + * + * @param acquire the action to acquire the resource + * + * @param use is the action to consume the resource and produce a result. + * Once the resulting suspend program terminates, either successfully, error or disposed, + * the [release] function will run to clean up the resources. + * + * @param release is the action that's supposed to release the allocated resource after `use` is done, irregardless + * of its exit condition. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.* + * + * class File(url: String) { + * fun open(): File = this + * fun close(): Unit {} + * override fun toString(): String = "This file contains some interesting content!" + * } + * + * suspend fun openFile(uri: String): File = File(uri).open() + * suspend fun closeFile(file: File): Unit = file.close() + * suspend fun fileToString(file: File): String = file.toString() + * + * suspend fun main(): Unit { + * //sampleStart + * val res = bracket( + * acquire = { openFile("data.json") }, + * use = { file -> fileToString(file) }, + * release = { file: File -> closeFile(file) } + * ) + * //sampleEnd + * println(res) + * } + * ``` + */ suspend fun bracket( acquire: suspend () -> A, use: suspend (A) -> B, release: suspend (A) -> Unit ): B = bracketCase(acquire, use, { a, _ -> release(a) }) +/** + * A way to safely acquire a resource and release in the face of errors and cancellation. + * It uses [ExitCase] to distinguish between different exit cases when releasing the acquired resource. + * + * [bracketCase] exists out of a three stages: + * 1. acquisition + * 2. consumption + * 3. releasing + * + * 1. Resource acquisition is **NON CANCELLABLE**. + * If resource acquisition fails, meaning no resource was actually successfully acquired then we short-circuit the effect. + * Reason being, we cannot [release] what we did not `acquire` first. Same reason we cannot call [use]. + * If it is successful we pass the result to stage 2 [use]. + * + * 2. Resource consumption is like any other `suspend` effect. The key difference here is that it's wired in such a way that + * [release] **will always** be called either on [ExitCase.Cancelled], [ExitCase.Failure] or [ExitCase.Completed]. + * If it failed than the resulting [suspend] from [bracketCase] will be the error, otherwise the result of [use]. + * + * 3. Resource releasing is **NON CANCELLABLE**, otherwise it could result in leaks. + * In the case it throws the resulting [suspend] will be either the error or a composed error if one occurred in the [use] stage. + * + * @param acquire the action to acquire the resource + * + * @param use is the action to consume the resource and produce a result. + * Once the resulting suspend program terminates, either successfully, error or disposed, + * the [release] function will run to clean up the resources. + * + * @param release the allocated resource after [use] terminates. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.* + * + * class File(url: String) { + * fun open(): File = this + * fun close(): Unit {} + * } + * + * suspend fun File.content(): String = + * "This file contains some interesting content!" + * suspend fun openFile(uri: String): File = File(uri).open() + * suspend fun closeFile(file: File): Unit = file.close() + * + * suspend fun main(): Unit { + * //sampleStart + * val res = bracketCase( + * acquire = { openFile("data.json") }, + * use = { file -> file.content() }, + * release = { file, exitCase -> + * when (exitCase) { + * is ExitCase.Completed -> println("File closed with $exitCase") + * is ExitCase.Cancelled -> println("Program cancelled with $exitCase") + * is ExitCase.Failure -> println("Program failed with $exitCase") + * } + * closeFile(file) + * } + * ) + * //sampleEnd + * println(res) + * } + * ``` + */ suspend fun bracketCase( acquire: suspend () -> A, use: suspend (A) -> B, diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ConcurrentVar.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ConcurrentVar.kt index b37b56c52..9f196a7b5 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ConcurrentVar.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ConcurrentVar.kt @@ -6,7 +6,7 @@ import kotlinx.atomicfu.atomic /** * [ConcurrentVar] is a mutable concurrent safe variable which is either `empty` or contains a `single value` of type [A]. - * + * It behaves the same as a single element [arrow.fx.coroutines.stream.concurrent.Queue]. * When trying to [put] or [take], it'll suspend when it's respectively [isEmpty] or [isNotEmpty]. * * There are also operators that return immediately, [tryTake] & [tryPut], @@ -18,19 +18,12 @@ import kotlinx.atomicfu.atomic * ```kotlin:ank:playground * import arrow.fx.coroutines.* * - * suspend fun fibonacci(n: Int, prev: Int = 0, next: Int = 1): Int = - * when (n) { - * 0 -> prev - * 1 -> next - * else -> fibonacci(n - 1, next, prev + next) - * } - * * suspend fun main(): Unit { * val mvar = ConcurrentVar.empty() * * ForkConnected { - * val asyncFib = fibonacci(50) - * mvar.put(asyncFib) + * sleep(3.seconds) + * mvar.put(5) * } * * val r = mvar.take() // suspend until Fork puts result in MVar diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Fiber.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Fiber.kt index 93173b921..98b3213b0 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Fiber.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Fiber.kt @@ -35,8 +35,24 @@ internal fun Fiber(promise: UnsafePromise, conn: SuspendConnection): Fibe /** * Launches a new suspendable cancellable coroutine within a [Fiber]. - * It does so by connecting the created [Fiber]'s cancellation to the parent. - * If the parent gets cancelled, then this [Fiber] will also get cancelled. + * It does so by connecting the created [Fiber]'s cancellation to the callers `suspend` scope. + * If the caller of `ForkConnected` gets cancelled, then this [Fiber] will also get cancelled. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.* + * + * suspend fun main(): Unit { + * val parent = ForkConnected { + * ForkConnected { // cancellation connected to parent + * onCancel({ never() }) { + * println("I got cancelled by my parent") + * } + * } + * } + * sleep(1.seconds) + * parent.cancel() + * } + * ``` * * You can [Fiber.join] or [Fiber.cancel] the computation. * Cancelling this [Fiber] **will not** cancel its parent. @@ -53,6 +69,7 @@ suspend fun ForkConnected(ctx: CoroutineContext = ComputationPool, f: suspen Fiber(promise, conn2) } +/** @see ForkConnected **/ suspend fun (suspend () -> A).forkConnected(ctx: CoroutineContext = ComputationPool): Fiber = ForkConnected(ctx, this) @@ -107,6 +124,7 @@ suspend fun ForkScoped( Fiber(promise, conn2) } +/** @see ForkScoped */ suspend fun (suspend () -> A).forkScoped( ctx: CoroutineContext = ComputationPool, interruptWhen: suspend () -> Unit @@ -117,10 +135,14 @@ suspend fun (suspend () -> A).forkScoped( * You can [Fiber.join] or [Fiber.cancel] the computation. * * **BEWARE** you immediately leak the [Fiber] when launching without connection control. - * Use [ForkConnected] or safely launch the fiber as a [Resource] or using [Fiber]. + * Use [ForkConnected] or safely launch the fiber as a [Resource] or using [bracketCase]. * * @see ForkConnected for a fork operation that wires cancellation to its parent in a safe way. */ +suspend fun ForkAndForget(ctx: CoroutineContext = ComputationPool, f: suspend () -> A): Fiber = + f.forkAndForget(ctx) + +/** @see ForkAndForget */ suspend fun (suspend () -> A).forkAndForget(ctx: CoroutineContext = ComputationPool): Fiber { val promise = UnsafePromise() // A new SuspendConnection, because its cancellation is now decoupled from our current one. @@ -128,6 +150,3 @@ suspend fun (suspend () -> A).forkAndForget(ctx: CoroutineContext = Computat startCoroutineCancellable(CancellableContinuation(ctx, conn, promise::complete)) return Fiber(promise, conn) } - -suspend fun ForkAndForget(ctx: CoroutineContext = ComputationPool, f: suspend () -> A): Fiber = - f.forkAndForget(ctx) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTraverse.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTraverse.kt index 5d4b8ef57..b85ed85b7 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTraverse.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTraverse.kt @@ -84,6 +84,23 @@ suspend fun Iterable.parTraverse(f: suspend (A) -> B): List = * Traverse this [Iterable] and and run all mappers [f] on [CoroutineContext]. * Cancelling this operation cancels all running tasks. * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.* + * + * data class User(val id: Int) + * + * suspend fun main(): Unit { + * //sampleStart + * suspend fun getUserById(id: Int): User = + * User(id) + * + * val res = listOf(1, 2, 3) + * .parTraverse(ComputationPool, ::getUserById) + * //sampleEnd + * println(res) + * } + * ``` + * * **WARNING** it runs in parallel depending on the capabilities of the provided [CoroutineContext]. * We ensure they start in sequence so it's guaranteed to finish on a single threaded context. * diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTupledN.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTupledN.kt index 169cd69bf..0bcfd3831 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTupledN.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTupledN.kt @@ -13,6 +13,27 @@ import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED * Parallel maps [fa], [fb] in parallel on [ComputationPool]. * Cancelling this operation cancels both operations running in parallel. * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.* + * + * suspend fun main(): Unit { + * //sampleStart + * val result = parMapN( + * { "First one is on ${Thread.currentThread().name}" }, + * { "Second one is on ${Thread.currentThread().name}" } + * ) { (a, b) -> + * "$a\n$b" + * } + * //sampleEnd + * println(result) + * } + * ``` + * + * @param fa value to parallel map + * @param fb value to parallel map + * @param f function to map/combine value [A] and [B] + * ``` + * * @see parMapN for the same function that can race on any [CoroutineContext]. */ suspend fun parMapN(fa: suspend () -> A, fb: suspend () -> B, f: (Pair) -> C): C = diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Promise.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Promise.kt index e9ab12062..939e2ec6c 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Promise.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Promise.kt @@ -10,7 +10,28 @@ import kotlinx.atomicfu.atomic /** * When made, a [Promise] is empty. Until it is fulfilled, which can only happen once. * - * A [Promise] that a value of type [A] will be provided later. + * A `Promise` is commonly used to provide and receive a value from 2 different threads, + * since `Promise` can only be completed once unlike [ConcurrentVar] we can consider it a synchronization primitive. + * + * Let's say we wanted to await a `Fiber`, we could complete a Promise `latch` to signal it finished. + * Awaiting the latch `Promise` will now prevent `main` from finishing early. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.* + * + * suspend fun main(): Unit { + * val await = Promise() + * + * ForkConnected { + * println("Fiber starting up!") + * sleep(3.seconds) + * println("Fiber finished!") + * await.complete(Unit) + * } + * + * await.get() // Suspend until fiber finishes + * } + * ``` */ interface Promise { @@ -79,8 +100,11 @@ interface Promise { suspend fun complete(a: A): Either companion object { - fun unsafe(): Promise = DefaultPromise() - suspend operator fun invoke(): Promise = unsafe() + fun unsafe(): Promise = + DefaultPromise() + + suspend operator fun invoke(): Promise = + unsafe() } object AlreadyFulfilled { diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Race2.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Race2.kt index 47621e608..575f70ca5 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Race2.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Race2.kt @@ -13,7 +13,34 @@ import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED * Races the participants [fa], [fb] in parallel on the [ComputationPool]. * The winner of the race cancels the other participants, * cancelling the operation cancels all participants. + * An [uncancellable] participants will back-pressure the result of [raceN]. * + * ```kotlin:ank:playground + * import arrow.core.Either + * import arrow.fx.coroutines.* + * + * suspend fun main(): Unit { + * suspend fun loser(): Int = + * cancellable { callback -> + * // Wait forever and never complete callback + * CancelToken { println("Never got cancelled for losing.") } + * } + * + * val winner = raceN({ loser() }, { 5 }) + * + * val res = when(winner) { + * is Either.Left -> "Never always loses race" + * is Either.Right -> "Race was won with ${winner.b}" + * } + * //sampleEnd + * println(res) + * } + * ``` + * + * @param fa task to participate in the race + * @param fb task to participate in the race + * @return either [Either.Left] if [fa] won the race, or [Either.Right] if [fb] won the race. + * @see racePair for a version that does not automatically cancel the loser. * @see raceN for the same function that can race on any [CoroutineContext]. */ suspend fun raceN(fa: suspend () -> A, fb: suspend () -> B): Either = @@ -27,6 +54,32 @@ suspend fun raceN(fa: suspend () -> A, fb: suspend () -> B): Either * **WARNING** it runs in parallel depending on the capabilities of the provided [CoroutineContext]. * We ensure they start in sequence so it's guaranteed to finish on a single threaded context. * + * ```kotlin:ank:playground + * import arrow.core.Either + * import arrow.fx.coroutines.* + * + * suspend fun main(): Unit { + * suspend fun loser(): Int = + * cancellable { callback -> + * // Wait forever and never complete callback + * CancelToken { println("Never got cancelled for losing.") } + * } + * + * val winner = raceN(IOPool, { loser() }, { 5 }) + * + * val res = when(winner) { + * is Either.Left -> "Never always loses race" + * is Either.Right -> "Race was won with ${winner.b}" + * } + * //sampleEnd + * println(res) + * } + * ``` + * + * @param fa task to participate in the race + * @param fb task to participate in the race + * @return either [Either.Left] if [fa] won the race, or [Either.Right] if [fb] won the race. + * @see racePair for a version that does not automatically cancel the loser. * @see raceN for a function that ensures it runs in parallel on the [ComputationPool]. */ suspend fun raceN(ctx: CoroutineContext, fa: suspend () -> A, fb: suspend () -> B): Either { diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Resource.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Resource.kt index 91b0c3fcb..7b4ed122e 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Resource.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Resource.kt @@ -1,6 +1,9 @@ package arrow.fx.coroutines import arrow.core.Either +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import kotlin.coroutines.CoroutineContext /** * [Resource] models resource allocation and releasing. It is especially useful when multiple resources that depend on each other @@ -42,7 +45,7 @@ import arrow.core.Either * closeDBHandle(handle) * closeConsumer(consumer) * } - * // sampleEnd + * //sampleEnd * suspend fun main(): Unit = program.invoke() * ``` * Here we are creating and then using a service that has a dependency on two resources: A database handle and a consumer of some sort. All three resources need to be closed in the correct order at the end. @@ -79,7 +82,7 @@ import arrow.core.Either * Unit * } * } - * // sampleEnd + * //sampleEnd * * suspend fun main(): Unit = resourceProgram.invoke() * ``` @@ -209,6 +212,65 @@ sealed class Resource { fun defer(f: suspend () -> Resource): Resource = Resource.Defer(f) + /** + * Creates a single threaded [CoroutineContext] as a [Resource]. + * Upon release an orderly shutdown of the [ExecutorService] takes place in which previously submitted + * tasks are executed, but no new tasks will be accepted. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.* + * import java.util.concurrent.Executors + * import java.util.concurrent.atomic.AtomicInteger + * import kotlin.math.max + * + * suspend fun main(): Unit { + * val pool = Resource.fromExecutor { + * val ctr = AtomicInteger(0) + * val size = max(2, Runtime.getRuntime().availableProcessors()) + * Executors.newFixedThreadPool(size) { r -> + * Thread(r, "computation-${ctr.getAndIncrement()}") + * .apply { isDaemon = true } + * } + * } + * + * pool.use { ctx -> + * listOf(1, 2, 3, 4, 5).parTraverse(ctx) { i -> + * println("#$i running on ${Thread.currentThread().name}") + * } + * } + * } + * ``` + */ + fun fromExecutor(f: suspend () -> ExecutorService): Resource = + Resource(f) { s -> s.shutdown() }.map(ExecutorService::asCoroutineContext) + + /** + * Creates a single threaded [CoroutineContext] as a [Resource]. + * Upon release an orderly shutdown of the [ExecutorService] takes place in which previously submitted + * tasks are executed, but no new tasks will be accepted. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.* + * + * val singleCtx = Resource.singleThreadContext("single") + * + * suspend fun main(): Unit = + * singleCtx.use { ctx -> + * evalOn(ctx) { + * println("I am running on ${Thread.currentThread().name}") + * } + * } + * ``` + */ + fun singleThreadContext(name: String): Resource = + fromExecutor { + Executors.newSingleThreadExecutor { r -> + Thread(r, name).apply { + isDaemon = true + } + } + } + @Suppress("UNCHECKED_CAST") fun tailRecM(a: A, f: (A) -> Resource>): Resource { fun loop(r: Resource>): Resource = when (r) { diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Semaphore.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Semaphore.kt index f5e99b095..c6e44e0b5 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Semaphore.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Semaphore.kt @@ -3,13 +3,73 @@ package arrow.fx.coroutines import arrow.core.Either /** - * A counting [Semaphore] has a non-negative number of permits available. + * A counting [Semaphore] has a non-negative number of permits available. + * It's used to track how many permits are in-use, + * and to automatically await a number of permits to become available. + * * Acquiring permits decreases the available permits, and releasing increases the available permits * * Acquiring permits when there aren't enough available will suspend the acquire call * until the requested become available. Note that acquires are satisfied in strict FIFO order. + * The suspending acquire calls are cancellable, and will release any already acquired permits. + * + * Let's say we want to guarantee mutually exclusiveness, we can use a `Semaphore` with a single permit. + * Having a `Semaphore` with a single permit, we can track that only a single context can access something. + * + * ```kotlin:ank:playground + * //sampleStart + * import arrow.fx.coroutines.* + * import java.util.concurrent.atomic.AtomicInteger + * + * /* Only allwos single accesor */ + * class PreciousFile(private val accesors: AtomicInteger = AtomicInteger(0)) { + * fun use(): Unit { + * check(accesors.incrementAndGet() == 1) { "File accessed before released" } + * check(accesors.decrementAndGet() == 0) { "File accessed before released" } + * } + * } + * + * suspend fun main() { + * val file = PreciousFile() + * val mutex = Semaphore(1) + * + * (0 until 100).parTraverse(IOPool) { i -> + * mutex.withPermit { + * val res = file.use() + * println("$i accessed PreciousFile on ${Thread.currentThread().name}") + * } + * } + * //sampleEnd + * } + * ``` + * + * By wrapping our operation in `withPermit` we ensure that our `var count: Int` is only updated by a single thread at the same time. + * If we wouldn't protect our `PreciousFile` from being access by only a single thread at the same time, then it'll blow up our program. + * + * This is a common use-case when you need to write to a single `File` from different threads, since concurrent writes could result in inconsistent state. + * + * `Semaphore` is more powerful besides just modelling mutally exlusiveness, + * since it's allows to track any amount of permits. + * You can also use it to limit amount of parallel tasks, for example when using `parTraverse` we might want to limit how many tasks are running effectively in parallel. + * + * ```kotlin:ank:playground + * suspend fun heavyProcess(i: Int): Unit { + * println("Started job $i") + * sleep(250.milliseconds) + * println("Finished job $i") + * } + * + * suspend fun main(): Unit { + * val limit = 3 + * val semaphore = Semaphore(3) + * (0..50).parTraverse { i -> + * semaphore.withPermit { heavyProcess(i) } + * } + * } + * ``` * - * The blocking acquire calls are cancellable, and will release any already acquired permits. + * Here we set a limit of `3` to ensure that only 3 `heavyProcess` are running at the same time. + * This can ensure we don't stress the JVM too hard, OOM or worse. */ interface Semaphore { diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt index 690c4396f..8421a804d 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt @@ -91,20 +91,24 @@ suspend fun cancellable(cb: ((Result) -> Unit) -> CancelToken): A = * import arrow.core.* * import arrow.fx.coroutines.* * import java.lang.RuntimeException + * import kotlin.coroutines.Continuation + * import kotlin.coroutines.EmptyCoroutineContext + * import kotlin.coroutines.startCoroutine * * typealias Callback = (List?, Throwable?) -> Unit * * class GithubId * object GithubService { - * private val listeners: MutableMap = mutableMapOf() + * private val listeners: MutableMap> = mutableMapOf() * suspend fun getUsernames(callback: Callback): GithubId { * val id = GithubId() - * listeners[id] = callback - * ForkConnected { sleep(2.seconds); callback(listOf("Arrow"), null) } + * val fiber = ForkConnected { sleep(2.seconds); callback(listOf("Arrow"), null) } + * listeners[id] = fiber * return id * } - * * fun unregisterCallback(id: GithubId): Unit { + * suspend { listeners[id]?.cancel() } + * .startCoroutine(Continuation(EmptyCoroutineContext) { }) // Launch and forget * listeners.remove(id) * } * } @@ -120,10 +124,8 @@ suspend fun cancellable(cb: ((Result) -> Unit) -> CancelToken): A = * else -> cb(Result.failure(RuntimeException("Null result and no exception"))) * } * } - * * CancelToken { GithubService.unregisterCallback(id) } * } - * * val result = getUsernames() * //sampleEnd * println(result) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/cancelBoundary.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/cancelBoundary.kt index fd2a4d200..c23a14bfb 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/cancelBoundary.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/cancelBoundary.kt @@ -4,8 +4,33 @@ import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn /** - * Checks for cancellation, - * when cancellation occurs this coroutine will suspend indefinitely and never continue. + * Inserts a cancellable boundary + * + * In a cancelable environment, we need to add mechanisms to react when cancellation is triggered. + * In a coroutine, a cancel boundary checks for the cancellation status; and, it does not allow the coroutine to keep executing in the case cancellation was triggered. + * Useful, for example, to cancel the continuation of a loop, as shown in this code snippet: + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.* + * + * //sampleStart + * suspend fun forever(): Unit { + * while(true) { + * println("I am getting dizzy...") + * cancelBoundary() // cancellable computation loop + * } + * } + * + * suspend fun main(): Unit { + * val fiber = ForkConnected { + * guaranteeCase({ forever() }) { exitCase -> + * println("forever finished with $exitCase") + * } + * } + * sleep(10.milliseconds) + * fiber.cancel() + * } + * ``` */ suspend fun cancelBoundary(): Unit = suspendCoroutineUninterceptedOrReturn { cont -> diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/dispatchers.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/dispatchers.kt index 6e3714509..2dd83ceb8 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/dispatchers.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/dispatchers.kt @@ -9,35 +9,41 @@ import kotlin.coroutines.Continuation import kotlin.coroutines.ContinuationInterceptor import kotlin.coroutines.CoroutineContext -fun fromExecutor(f: suspend () -> ExecutorService): Resource = - Resource(f) { s -> s.shutdown() }.map(ExecutorService::asCoroutineContext) - -fun singleThreadContext(name: String): Resource = - fromExecutor { - Executors.newSingleThreadExecutor { r -> - Thread(r, name).apply { - isDaemon = true - } - } - } - -val ComputationPool: CoroutineContext = ForkJoinPool().asCoroutineContext() +/** + * A [CoroutineContext] to run non-blocking suspending code, + * all code that relies on blocking IO should prefer to use an unbounded [IOPool]. + * + * A work-stealing thread pool using all available processors as its target parallelism level. + */ +val ComputationPool: CoroutineContext = + ForkJoinPool().asCoroutineContext() private object IOCounter { private val ref = atomic(0) fun getAndIncrement(): Int = ref.getAndIncrement() } -val IOPool = Executors.newCachedThreadPool { r -> - Thread(r).apply { - name = "io-arrow-kt-worker-${IOCounter.getAndIncrement()}" - isDaemon = true - } -}.asCoroutineContext() +/** + * Creates a thread pool that creates new threads as needed, but + * will reuse previously constructed threads when they are available, and uses the provided. + * + * This pool is prone to cause [OutOfMemoryError] since the pool size is unbounded. + */ +val IOPool: CoroutineContext = + Executors.newCachedThreadPool { r -> + Thread(r).apply { + name = "io-arrow-kt-worker-${IOCounter.getAndIncrement()}" + isDaemon = true + } + }.asCoroutineContext() -private fun ExecutorService.asCoroutineContext(): CoroutineContext = +internal fun ExecutorService.asCoroutineContext(): CoroutineContext = ExecutorServiceContext(this) +/** + * Wraps an [ExecutorService] in a [CoroutineContext] as a [ContinuationInterceptor] + * scheduling on the [ExecutorService] when [kotlin.coroutines.intrinsics.intercepted] is called. + */ private class ExecutorServiceContext(val pool: ExecutorService) : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { override fun interceptContinuation(continuation: Continuation): Continuation = @@ -47,6 +53,7 @@ private class ExecutorServiceContext(val pool: ExecutorService) : }) } +/** Wrap existing continuation to resumes itself on the provided [ExecutorService] */ private class ExecutorServiceContinuation(val pool: ExecutorService, val cont: Continuation) : Continuation { override val context: CoroutineContext = cont.context diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/predef.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/predef.kt index 55eb40ea9..aeb223d28 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/predef.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/predef.kt @@ -6,10 +6,11 @@ internal inline infix fun ((A) -> B).andThen(crossinline f: (B) -> C): internal inline infix fun (suspend (A) -> B).andThen(crossinline f: suspend (B) -> C): suspend (A) -> C = { a: A -> f(this(a)) } -infix fun A.prependTo(fa: Iterable): List = +@PublishedApi +internal infix fun A.prependTo(fa: Iterable): List = listOf(this) + fa -fun Iterable.deleteFirst(f: (A) -> Boolean): Pair>? { +internal fun Iterable.deleteFirst(f: (A) -> Boolean): Pair>? { tailrec fun go(rem: Iterable, acc: List): Pair>? = when { rem.isEmpty() -> null @@ -24,13 +25,13 @@ fun Iterable.deleteFirst(f: (A) -> Boolean): Pair>? { return go(this, emptyList()) } -fun Iterable.uncons(): Pair>? = +internal fun Iterable.uncons(): Pair>? = firstOrNull()?.let { Pair(it, drop(1)) } -fun Iterable<*>.isEmpty(): Boolean = +internal fun Iterable<*>.isEmpty(): Boolean = size() == 0 -fun Iterable<*>.size(): Int = +internal fun Iterable<*>.size(): Int = when (this) { is Collection -> size else -> fold(0) { acc, _ -> acc + 1 } diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index ae3790748..fadf3d68c 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -55,25 +55,29 @@ fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax. * If cancellation signal is received while [cb] is running, then the [CancelToken] will be triggered as soon as it's returned. * * ```kotlin:ank:playground - * import arrow.fx.coroutines.CancelToken + * import arrow.fx.coroutines.* * import arrow.fx.coroutines.stream.* * import java.lang.RuntimeException * import java.util.concurrent.Executors - * import java.util.concurrent.ScheduledFuture - * import java.util.concurrent.TimeUnit + * import java.util.concurrent.Future * * typealias Callback = (List?, Throwable?) -> Unit * * class GithubId * object GithubService { - * private val listeners: MutableMap> = mutableMapOf() + * private val listeners: MutableMap> = mutableMapOf() * fun getUsernames(callback: Callback): GithubId { * val id = GithubId() - * val future = Executors.newScheduledThreadPool(1).run { - * var count = 0 - * scheduleAtFixedRate({ - * callback(listOf("Arrow - ${count++}"), null) - * }, 0, 500, TimeUnit.MILLISECONDS) + * val future = Executors.newSingleThreadExecutor().run { + * submit { + * Thread.sleep(300) + * callback(listOf("Arrow - 1"), null) + * Thread.sleep(300) + * callback(listOf("Arrow - 2"), null) + * Thread.sleep(300) + * callback(listOf("Arrow - 3"), null) + * shutdown() + * } * } * listeners[id] = future * return id @@ -88,7 +92,7 @@ fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax. * suspend fun main(): Unit { * //sampleStart * fun getUsernames(): Stream = - * Stream.cancellable { + * Stream.cancellable { * val id = GithubService.getUsernames { names, throwable -> * when { * names != null -> emit(names) @@ -96,15 +100,13 @@ fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax. * else -> throw RuntimeException("Null result and no exception") * } * } - * * CancelToken { GithubService.unregisterCallback(id) } - * } + * }.take(3) * * val result = getUsernames() - * .take(3) + * .effectTap { println(it) } * .compile() * .toList() - * * //sampleEnd * println(result) * } diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/predef.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/predef.kt index 8c4474859..77b993860 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/predef.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/predef.kt @@ -8,7 +8,7 @@ internal fun checkBounds(arraySize: Int, offset: Int, length: Int) { } /** The iterator which produces no values. */ -val empty: Iterator = object : Iterator { +internal val empty: Iterator = object : Iterator { override fun hasNext(): Boolean = false @@ -16,7 +16,7 @@ val empty: Iterator = object : Iterator { throw NoSuchElementException("next on empty iterator") } -fun Iterator.flatMap(f: (A) -> Iterator): Iterator = +internal fun Iterator.flatMap(f: (A) -> Iterator): Iterator = object : Iterator { private var cur: Iterator = empty diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/CancelBoundary.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/CancelBoundary.kt new file mode 100644 index 000000000..5712436c5 --- /dev/null +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/CancelBoundary.kt @@ -0,0 +1,29 @@ +package arrow.fx.coroutines + +import io.kotest.core.spec.style.StringSpec +import io.kotest.matchers.shouldBe + +class CancelBoundary : StringSpec({ + + suspend fun forever(): Unit { + while (true) { + println("I am getting dizzy...") + cancelBoundary() // cancellable computation loop + } + } + + "endless loop can be cancelled if it includes a boundary" { + val latch = Promise() + val exit = Promise() + val f = ForkConnected { + guaranteeCase({ + latch.complete(Unit) + forever() + }, { ec -> exit.complete(ec) }) + } + latch.get() + f.cancel() + exit.get() shouldBe ExitCase.Cancelled + sleep(1.seconds) + } +}) diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/FiberTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/FiberTest.kt index cd47a7ff9..4c64450ad 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/FiberTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/FiberTest.kt @@ -10,7 +10,7 @@ class FiberTest : ArrowFxSpec(spec = { "ForkConnected returns on the original context" { val forkCtxName = "forkCtx" - val forker = singleThreadContext(forkCtxName) + val forker = Resource.singleThreadContext(forkCtxName) checkAll(Arb.int()) { i -> single.zip(forker).use { (single, forker) -> evalOn(single) { @@ -28,7 +28,7 @@ class FiberTest : ArrowFxSpec(spec = { "ForkConnected returns on the original context on failure" { val forkCtxName = "forkCtx" - val forker = singleThreadContext(forkCtxName) + val forker = Resource.singleThreadContext(forkCtxName) checkAll(Arb.throwable()) { e -> single.zip(forker).use { (single, forker) -> evalOn(single) { @@ -127,7 +127,7 @@ class FiberTest : ArrowFxSpec(spec = { "ForkScoped returns on the original context" { val forkCtxName = "forkCtx" - val forker = singleThreadContext(forkCtxName) + val forker = Resource.singleThreadContext(forkCtxName) checkAll(Arb.int()) { i -> single.zip(forker).use { (single, forker) -> @@ -211,7 +211,7 @@ class FiberTest : ArrowFxSpec(spec = { "ForkAndForget returns on the original context" { val forkCtxName = "forkCtx" - val forker = singleThreadContext(forkCtxName) + val forker = Resource.singleThreadContext(forkCtxName) checkAll(Arb.int()) { i -> single.zip(forker).use { (single, forker) -> diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/GuaranteeCaseTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/GuaranteeCaseTest.kt index 06f661f6b..4090b8a9d 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/GuaranteeCaseTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/GuaranteeCaseTest.kt @@ -4,7 +4,6 @@ import arrow.core.Either import io.kotest.matchers.shouldBe import io.kotest.property.Arb import io.kotest.property.arbitrary.int -import io.kotest.property.checkAll class GuaranteeCaseTest : ArrowFxSpec(spec = { @@ -14,7 +13,7 @@ class GuaranteeCaseTest : ArrowFxSpec(spec = { val res = guaranteeCase( fa = { i }, - release = { ex -> p.complete(ex) } + finalizer = { ex -> p.complete(ex) } ) p.get() shouldBe ExitCase.Completed @@ -28,7 +27,7 @@ class GuaranteeCaseTest : ArrowFxSpec(spec = { val attempted = Either.catch { guaranteeCase( fa = { throw e }, - release = { ex -> p.complete(ex) } + finalizer = { ex -> p.complete(ex) } ) } @@ -47,7 +46,7 @@ class GuaranteeCaseTest : ArrowFxSpec(spec = { start.complete(Unit) never() }, - release = { ex -> p.complete(ex) } + finalizer = { ex -> p.complete(ex) } ) } diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/ParTupledNTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/ParTupledNTest.kt index 5db3a301e..0996e0f1b 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/ParTupledNTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/ParTupledNTest.kt @@ -13,7 +13,7 @@ class ParTupledNTest : ArrowFxSpec(spec = { "parTupledN 2 returns to original context" { val mapCtxName = "parTupled2" - val mapCtx = fromExecutor { Executors.newFixedThreadPool(2, NamedThreadFactory { mapCtxName }) } + val mapCtx = Resource.fromExecutor { Executors.newFixedThreadPool(2, NamedThreadFactory { mapCtxName }) } checkAll { single.zip(mapCtx).use { (single, mapCtx) -> @@ -32,7 +32,7 @@ class ParTupledNTest : ArrowFxSpec(spec = { "parTupledN 2 returns to original context on failure" { val mapCtxName = "parTupled2" - val mapCtx = fromExecutor { Executors.newFixedThreadPool(2, NamedThreadFactory { mapCtxName }) } + val mapCtx = Resource.fromExecutor { Executors.newFixedThreadPool(2, NamedThreadFactory { mapCtxName }) } checkAll(Arb.int(1..2), Arb.throwable()) { choose, e -> single.zip(mapCtx).use { (single, mapCtx) -> @@ -123,7 +123,7 @@ class ParTupledNTest : ArrowFxSpec(spec = { "parTupledN 3 returns to original context" { val mapCtxName = "parTupled3" - val mapCtx = fromExecutor { Executors.newFixedThreadPool(3, NamedThreadFactory { mapCtxName }) } + val mapCtx = Resource.fromExecutor { Executors.newFixedThreadPool(3, NamedThreadFactory { mapCtxName }) } checkAll { single.zip(mapCtx).use { (single, mapCtx) -> @@ -143,7 +143,7 @@ class ParTupledNTest : ArrowFxSpec(spec = { "parTupledN 3 returns to original context on failure" { val mapCtxName = "parTupled3" - val mapCtx = fromExecutor { Executors.newFixedThreadPool(3, NamedThreadFactory { mapCtxName }) } + val mapCtx = Resource.fromExecutor { Executors.newFixedThreadPool(3, NamedThreadFactory { mapCtxName }) } checkAll(Arb.int(1..3), Arb.throwable()) { choose, e -> single.zip(mapCtx).use { (single, mapCtx) -> diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/PredefTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/PredefTest.kt index 9cb94fcb0..5f14320e6 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/PredefTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/PredefTest.kt @@ -47,8 +47,8 @@ class PredefTest : ArrowFxSpec(spec = { checkAll(Arb.string(), Arb.string()) { a, b -> val t0 = threadName.invoke() - singleThreadContext(a) - .zip(singleThreadContext(b)) + Resource.singleThreadContext(a) + .zip(Resource.singleThreadContext(b)) .use { (ui, io) -> t0 shouldBe threadName.invoke() diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/RaceNTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/RaceNTest.kt index 8da82ac53..af5590fab 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/RaceNTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/RaceNTest.kt @@ -15,7 +15,7 @@ class RaceNTest : ArrowFxSpec(spec = { "race2 returns to original context" { val racerName = "race2" - val racer = fromExecutor { Executors.newFixedThreadPool(2, NamedThreadFactory { racerName }) } + val racer = Resource.fromExecutor { Executors.newFixedThreadPool(2, NamedThreadFactory { racerName }) } checkAll(Arb.int(1..2)) { choose -> single.zip(racer).use { (single, raceCtx) -> @@ -36,7 +36,7 @@ class RaceNTest : ArrowFxSpec(spec = { "race2 returns to original context on failure" { val racerName = "race2" - val racer = fromExecutor { Executors.newFixedThreadPool(2, NamedThreadFactory { racerName }) } + val racer = Resource.fromExecutor { Executors.newFixedThreadPool(2, NamedThreadFactory { racerName }) } checkAll(Arb.int(1..2), Arb.throwable()) { choose, e -> single.zip(racer).use { (single, raceCtx) -> @@ -116,7 +116,7 @@ class RaceNTest : ArrowFxSpec(spec = { "race3 returns to original context" { val racerName = "race3" - val racer = fromExecutor { Executors.newFixedThreadPool(3, NamedThreadFactory { racerName }) } + val racer = Resource.fromExecutor { Executors.newFixedThreadPool(3, NamedThreadFactory { racerName }) } checkAll(Arb.int(1..3)) { choose -> single.zip(racer).use { (single, raceCtx) -> @@ -141,7 +141,7 @@ class RaceNTest : ArrowFxSpec(spec = { "race3 returns to original context on failure" { val racerName = "race3" - val racer = fromExecutor { Executors.newFixedThreadPool(3, NamedThreadFactory { racerName }) } + val racer = Resource.fromExecutor { Executors.newFixedThreadPool(3, NamedThreadFactory { racerName }) } checkAll(Arb.int(1..3), Arb.throwable()) { choose, e -> single.zip(racer).use { (single, raceCtx) -> diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/RacePairTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/RacePairTest.kt index 9cf526328..624732c64 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/RacePairTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/RacePairTest.kt @@ -13,7 +13,7 @@ class RacePairTest : ArrowFxSpec(spec = { "race pair returns to original context" { val racerName = "racePair" - val racer = fromExecutor { Executors.newFixedThreadPool(2, NamedThreadFactory { racerName }) } + val racer = Resource.fromExecutor { Executors.newFixedThreadPool(2, NamedThreadFactory { racerName }) } checkAll(Arb.int(1..2)) { choose -> single.zip(racer).use { (single, raceCtx) -> @@ -36,7 +36,7 @@ class RacePairTest : ArrowFxSpec(spec = { "race pair returns to original context on failure" { val racerName = "racePair" - val racer = fromExecutor { Executors.newFixedThreadPool(2, NamedThreadFactory { racerName }) } + val racer = Resource.fromExecutor { Executors.newFixedThreadPool(2, NamedThreadFactory { racerName }) } checkAll(Arb.int(1..2), Arb.throwable()) { choose, e -> single.zip(racer).use { (single, raceCtx) -> diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/RaceTripleTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/RaceTripleTest.kt index e7f5d77dc..249d16b0c 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/RaceTripleTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/RaceTripleTest.kt @@ -11,7 +11,7 @@ class RaceTripleTest : ArrowFxSpec(spec = { "race triple returns to original context" { val racerName = "raceTriple" - val racer = fromExecutor { Executors.newFixedThreadPool(3, NamedThreadFactory { racerName }) } + val racer = Resource.fromExecutor { Executors.newFixedThreadPool(3, NamedThreadFactory { racerName }) } checkAll(Arb.int(1..3)) { choose -> single.zip(racer).use { (single, raceCtx) -> @@ -36,7 +36,7 @@ class RaceTripleTest : ArrowFxSpec(spec = { "race triple returns to original context on failure" { val racerName = "raceTriple" - val racer = fromExecutor { Executors.newFixedThreadPool(3, NamedThreadFactory { racerName }) } + val racer = Resource.fromExecutor { Executors.newFixedThreadPool(3, NamedThreadFactory { racerName }) } checkAll(Arb.int(1..3), Arb.throwable()) { choose, e -> single.zip(racer).use { (single, raceCtx) -> diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/TimerTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/TimerTest.kt index 2ab46c474..3815b0e5d 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/TimerTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/TimerTest.kt @@ -31,7 +31,7 @@ class TimerTest : ArrowFxSpec(spec = { "timeOutOrNull" - { "returns to original context without timing out" { - singleThreadContext("1").zip(single).use { (one, single) -> + Resource.singleThreadContext("1").zip(single).use { (one, single) -> checkAll { evalOn(single) { val n0 = threadName.invoke() @@ -49,7 +49,7 @@ class TimerTest : ArrowFxSpec(spec = { } "returns to original context when timing out" { - singleThreadContext("1").zip(single).use { (one, single) -> + Resource.singleThreadContext("1").zip(single).use { (one, single) -> checkAll { evalOn(single) { val n0 = threadName.invoke() @@ -67,7 +67,7 @@ class TimerTest : ArrowFxSpec(spec = { } "returns to original context on failure" { - singleThreadContext("1").zip(single).use { (one, single) -> + Resource.singleThreadContext("1").zip(single).use { (one, single) -> checkAll(Arb.throwable()) { e -> evalOn(single) { val n0 = threadName.invoke() @@ -87,7 +87,7 @@ class TimerTest : ArrowFxSpec(spec = { } "returns to original context on CancelToken failure" { - singleThreadContext("1").zip(single).use { (one, single) -> + Resource.singleThreadContext("1").zip(single).use { (one, single) -> checkAll(Arb.throwable()) { e -> evalOn(single) { val n0 = threadName.invoke() diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/predef-test.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/predef-test.kt index 65ae9ebec..9b1c7dfd3 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/predef-test.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/predef-test.kt @@ -39,7 +39,7 @@ data class SideEffect(var counter: Int = 0) { } val singleThreadName = "single" -val single = singleThreadContext(singleThreadName) +val single = Resource.singleThreadContext(singleThreadName) val threadName: suspend () -> String = { Thread.currentThread().name } @@ -69,7 +69,7 @@ suspend fun assertCancellable(f: suspend () -> Unit): Unit { start.complete(Unit) f() }, - release = { ex -> p.complete(ex) } + finalizer = { ex -> p.complete(ex) } ) } diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CancellationTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CancellationTest.kt index 82b590916..491f9f484 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CancellationTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CancellationTest.kt @@ -70,7 +70,7 @@ private suspend fun assertCancellable(fa: (latch: Promise) -> Stream p.complete(ex) } + finalizer = { ex -> p.complete(ex) } ) } @@ -91,7 +91,7 @@ private suspend fun Stream.assertCancellable(): Unit { .compile() .drain() }, - release = { ex -> p.complete(ex) } + finalizer = { ex -> p.complete(ex) } ) } diff --git a/arrow-fx/src/main/kotlin/arrow/fx/Resource.kt b/arrow-fx/src/main/kotlin/arrow/fx/Resource.kt index 9ee5f5785..28004c9ba 100644 --- a/arrow-fx/src/main/kotlin/arrow/fx/Resource.kt +++ b/arrow-fx/src/main/kotlin/arrow/fx/Resource.kt @@ -61,7 +61,7 @@ inline fun ResourceOf.fix(): Resource = * !closeDBHandle(handle) * !closeConsumer(consumer) * } - * // sampleEnd + * //sampleEnd * * fun main() { * program.unsafeRunSync() @@ -99,7 +99,7 @@ inline fun ResourceOf.fix(): Resource = * } * } * } - * // sampleEnd + * //sampleEnd * * fun main() { * bracketProgram.unsafeRunSync() @@ -140,7 +140,7 @@ inline fun ResourceOf.fix(): Resource = * * IO.unit * }.fix() - * // sampleEnd + * //sampleEnd * * fun main() { * managedTProgram.unsafeRunSync()