Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Improve Arrow Fx Coroutines docs for release #248

Merged
merged 32 commits into from
Aug 15, 2020
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e55f195
Move and document Resource constructors
nomisRev Aug 7, 2020
7c2e0d7
Merge remote-tracking branch 'origin/master' into sv-docs-draft
nomisRev Aug 7, 2020
2bdb73e
Add docs to Bracket.kt
nomisRev Aug 10, 2020
1aff17a
Add example cancelBoundary
nomisRev Aug 10, 2020
f3095f5
Add docs dispatchers
nomisRev Aug 10, 2020
97ddd95
Clean-up docs Fiber
nomisRev Aug 10, 2020
5d95689
Add docs and example raceN
nomisRev Aug 10, 2020
fa6b1f8
Remove file level comment race2
nomisRev Aug 11, 2020
11e814d
Fix never ending example Stream.cancellable
nomisRev Aug 11, 2020
888bed5
Improve example cancellableF
nomisRev Aug 11, 2020
d01d4a9
Add example parTraverseN
nomisRev Aug 11, 2020
c09d315
Add example parMapN
nomisRev Aug 11, 2020
b9c8c0b
Formatting Promise.kt
nomisRev Aug 11, 2020
a862829
Make predef.kt internal
nomisRev Aug 11, 2020
0dbc605
Fix typo and missing import
nomisRev Aug 11, 2020
d9d9868
Update async & concurrent quick start page
nomisRev Aug 11, 2020
37f7bc7
Update ConcurrentVar docs
nomisRev Aug 11, 2020
2f5b425
Add synchronization example Promise
nomisRev Aug 11, 2020
adbebb8
Add top doc for Semaphore
nomisRev Aug 11, 2020
cb849b0
Fix Resource snippets
nomisRev Aug 11, 2020
da03222
Add missing imports to async
nomisRev Aug 11, 2020
f15510f
Fix snippets
nomisRev Aug 11, 2020
099ed2a
Merge branch 'master' into sv-docs-draft
nomisRev Aug 11, 2020
f744ecc
Clarify guarantee/guaranteeCase resource usage
nomisRev Aug 12, 2020
05471df
Clarify ForkConnected
nomisRev Aug 12, 2020
bc3fd3a
Merge branch 'sv-docs-draft' of github.com:arrow-kt/arrow-fx into sv-…
nomisRev Aug 12, 2020
977845f
Update arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Bracke…
nomisRev Aug 12, 2020
2e504ce
Address PR Review PhBastiani
nomisRev Aug 13, 2020
1d1575c
Merge branch 'sv-docs-draft' of github.com:arrow-kt/arrow-fx into sv-…
nomisRev Aug 13, 2020
0978455
Merge remote-tracking branch 'origin/master' into sv-docs-draft
nomisRev Aug 13, 2020
bfc7243
Update cancelBoundary docs with suggestion PhBastiani
nomisRev Aug 15, 2020
3d03b07
Merge remote-tracking branch 'origin/master' into sv-docs-draft
nomisRev Aug 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
378 changes: 231 additions & 147 deletions arrow-docs/docs/fx/async/README.md

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions arrow-fx-coroutines/README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,11 @@ tailrec suspend fun sleeper(): Unit {
This also means that our new sleep can back-pressure `timeOutOrNull`.

```kotlin:ank
suspend main(): Unit {
import arrow.fx.coroutines.*

suspend fun main(): Unit {
val r = timeOutOrNull(1.seconds) {
uncancelable { sleep(2.seconds) }
uncancellable { sleep(2.seconds) }
} // r is null, but took 2 seconds.
}
```
Expand Down Expand Up @@ -306,7 +308,7 @@ val resources: List<Resource<File>> =
val resource: Resource<List<File>> =
resources.sequence(Resource.applicative())

suspend main(): Unit {
suspend fun main(): Unit {
resource.use { files ->
files.parTraverse(IODispatchers.IOPool) { file ->
file.toString()
Expand All @@ -328,7 +330,7 @@ Simple constructs like `suspend fun Either.catch(f: () -> A): Either<Throwable,
A simple example might be to repeat an action `n` times, similar to the `repeat` function in the standard library.

```kotlin:ank
suspend main(): Unit {
suspend fun main(): Unit {
repeat(Schedule.recurs<A>(n)) {
println("Hello")
}
Expand All @@ -338,7 +340,7 @@ suspend main(): Unit {
Alternatively we can re-use this `Schedule` to `retry` a `suspend fun` `n` times when it fails.

```kotlin:ank
suspend main(): Unit {
suspend fun main(): Unit {
retry(Schedule.recurs<A>(n)) {
println("I am going to do nothing but throw a tantrum!")
throw RuntimeException("Boom!")
Expand All @@ -353,7 +355,7 @@ fun <A> schedule(): Schedule<A, List<A>> = Schedule {
(recurs<A>(10) and spaced(10.seconds)) zipRight collect()
}

suspend main(): Unit {
suspend fun main(): Unit {
var count = Atomic(0)
nomisRev marked this conversation as resolved.
Show resolved Hide resolved

val history: List<Int> = repeat(schedule<Int>()) {
Expand Down
146 changes: 142 additions & 4 deletions arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Bracket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ suspend fun <A> uncancellable(f: suspend () -> A): A =
}
}

/**
* Registers an [onCancel] handler after [fa].
* [onCancel] is guaranteed to be called in case of cancellation, otherwise it's ignored.
*
* Useful for wiring cancellation tokens between fibers, building inter-op with other effect systems or testing.
*
* @param fa program that you want to register handler on
* @param onCancel handler to run when [fa] gets cancelled.
* @see guarantee for registering a handler that is guaranteed to always run.
* @see guaranteeCase for registering a handler that executes for any [ExitCase].
*/
suspend fun <A> onCancel(
fa: suspend () -> A,
onCancel: suspend () -> Unit
Expand All @@ -66,22 +77,149 @@ suspend fun <A> onCancel(
}
}

/**
* Guarantees execution of a given [finalizer] after [fa] regardless of success, error or cancellation.
*
* As best practice, it's not a good idea to release resources via [guarantee].
nomisRev marked this conversation as resolved.
Show resolved Hide resolved
* since [guarantee] doesn't properly model acquiring, using and releasing resources.
* It only models scheduling of a finalizer after a given suspending program,
* so you should prefer [Resource] or [bracket] which captures acquiring,
* using and releasing into 3 separate steps to ensure resource safety.
*
* @param fa program that you want to register handler on
* @param finalizer handler to run after [fa].
* @see guaranteeCase for registering a handler that tracks the [ExitCase] of [fa].
*/
suspend fun <A> guarantee(
fa: suspend () -> A,
release: suspend () -> Unit
): A = guaranteeCase(fa) { release.invoke() }
finalizer: suspend () -> Unit
): A = guaranteeCase(fa) { finalizer.invoke() }

/**
* Guarantees execution of a given [finalizer] after [fa] regardless of success, error or cancellation., allowing
* for differentiating between exit conditions with to the [ExitCase] argument of the finalizer.
*
* As best practice, it's not a good idea to release resources via [guaranteeCase].
* since [guaranteeCase] doesn't properly model acquiring, using and releasing resources.
* It only models scheduling of a finalizer after a given suspending program,
* so you should prefer [Resource] or [bracketCase] which captures acquiring,
* using and releasing into 3 separate steps to ensure resource safety.
*
* @param fa program that you want to register handler on
* @param finalizer handler to run after [fa].
* @see guarantee for registering a handler that ignores the [ExitCase] of [fa].
*/
suspend fun <A> guaranteeCase(
fa: suspend () -> A,
release: suspend (ExitCase) -> Unit
): A = bracketCase({ Unit }, { fa.invoke() }, { _, ex -> release(ex) })
finalizer: suspend (ExitCase) -> Unit
): A = bracketCase({ Unit }, { fa.invoke() }, { _, ex -> finalizer(ex) })

/**
* Meant for specifying tasks with safe resource acquisition and release in the face of errors and interruption.
* It would be the equivalent of an async capable `try/catch/finally` statements in mainstream imperative languages for resource
* acquisition and release.
*
* @param acquire the action to acquire the resource
*
* @param use is the action to consume the resource and produce a result.
* Once the resulting suspend program terminates, either successfully, error or disposed,
* the [release] function will run to clean up the resources.
*
* @param release is the action that's supposed to release the allocated resource after `use` is done, irregardless
* of its exit condition.
*
* ```kotlin:ank:playground
* import arrow.fx.IO
nomisRev marked this conversation as resolved.
Show resolved Hide resolved
*
* class File(url: String) {
* fun open(): File = this
* fun close(): Unit {}
* override fun toString(): String = "This file contains some interesting content!"
* }
*
* suspend fun openFile(uri: String): File = File(uri).open()
* suspend fun closeFile(file: File): Unit = file.close()
* suspend fun fileToString(file: File): String = file.toString()
*
* suspend fun main(): Unit {
* //sampleStart
* val res = bracketCase(
* acquire = { openFile("data.json") },
* use = { file -> fileToString(file) }),
* release = { file: File -> closeFile(file) }
* )
* //sampleEnd
* println(res)
* }
* ```
*/
suspend fun <A, B> bracket(
acquire: suspend () -> A,
use: suspend (A) -> B,
release: suspend (A) -> Unit
): B = bracketCase(acquire, use, { a, _ -> release(a) })

/**
* A way to safely acquire a resource and release in the face of errors and cancellation.
* It uses [ExitCase] to distinguish between different exit cases when releasing the acquired resource.
*
* [bracketCase] exists out of a three stages:
* 1. acquisition
* 2. consumption
* 3. releasing
*
* 1. Resource acquisition is **NON CANCELLABLE**.
* If resource acquisition fails, meaning no resource was actually successfully acquired then we short-circuit the effect.
* Reason being, we cannot [release] what we did not `acquire` first. Same reason we cannot call [use].
* If it is successful we pass the result to stage 2 [use].
*
* 2. Resource consumption is like any other `suspend` effect. The key difference here is that it's wired in such a way that
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent docs and clarifications!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we repeat this in bracket, I am unsure how we can improve the documentation of these top-level functions.

API docs are a very low profile and have little to no styling. I'm not sure how or if we need to improve our API docs, or if we should promote certain functions in a different manner. parMapN, parTupledN, raceN.

* [release] **will always** be called either on [ExitCase.Cancelled], [ExitCase.Failure] or [ExitCase.Completed].
* If it failed than the resulting [suspend] from [bracketCase] will be the error, otherwise the result of [use].
*
* 3. Resource releasing is **NON CANCELLABLE**, otherwise it could result in leaks.
* In the case it throws the resulting [suspend] will be either the error or a composed error if one occurred in the [use] stage.
*
* @param acquire the action to acquire the resource
*
* @param use is the action to consume the resource and produce a result.
* Once the resulting suspend program terminates, either successfully, error or disposed,
* the [release] function will run to clean up the resources.
*
* @param release the allocated resource after [use] terminates.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.*
*
* class File(url: String) {
* fun open(): File = this
* fun close(): Unit {}
* }
*
* suspend fun File.content(): String =
* "This file contains some interesting content!"
* suspend fun openFile(uri: String): File = File(uri).open()
* suspend fun closeFile(file: File): Unit = file.close()
*
* suspend fun main(): Unit {
* //sampleStart
* val res = bracketCase(
* acquire = { openFile("data.json") },
* use = { file -> file.content() },
* release = { file, exitCase ->
* when (exitCase) {
* is ExitCase.Completed -> println("File closed with $exitCase")
* is ExitCase.Cancelled -> println("Program cancelled with $exitCase")
* is ExitCase.Failure -> println("Program failed with $exitCase")
* }
* closeFile(file)
* }
* )
* //sampleEnd
* println(res)
* }
* ```
*/
suspend fun <A, B> bracketCase(
acquire: suspend () -> A,
use: suspend (A) -> B,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import kotlinx.atomicfu.atomic

/**
* [ConcurrentVar] is a mutable concurrent safe variable which is either `empty` or contains a `single value` of type [A].
*
* It behaves the same as a single element [arrow.fx.coroutines.stream.concurrent.Queue].
* When trying to [put] or [take], it'll suspend when it's respectively [isEmpty] or [isNotEmpty].
*
* There are also operators that return immediately, [tryTake] & [tryPut],
Expand All @@ -18,19 +18,12 @@ import kotlinx.atomicfu.atomic
* ```kotlin:ank:playground
* import arrow.fx.coroutines.*
*
* suspend fun fibonacci(n: Int, prev: Int = 0, next: Int = 1): Int =
* when (n) {
* 0 -> prev
* 1 -> next
* else -> fibonacci(n - 1, next, prev + next)
* }
*
* suspend fun main(): Unit {
* val mvar = ConcurrentVar.empty<Int>()
*
* ForkConnected {
* val asyncFib = fibonacci(50)
* mvar.put(asyncFib)
* sleep(3.seconds)
* mvar.put(5)
* }
*
* val r = mvar.take() // suspend until Fork puts result in MVar
Expand Down
31 changes: 25 additions & 6 deletions arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Fiber.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,24 @@ internal fun <A> Fiber(promise: UnsafePromise<A>, conn: SuspendConnection): Fibe

/**
* Launches a new suspendable cancellable coroutine within a [Fiber].
* It does so by connecting the created [Fiber]'s cancellation to the parent.
* If the parent gets cancelled, then this [Fiber] will also get cancelled.
* It does so by connecting the created [Fiber]'s cancellation to the callers `suspend` scope.
* If the caller of `ForkConnected` gets cancelled, then this [Fiber] will also get cancelled.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.*
*
* suspend fun main(): Unit {
* val parent = ForkConnected {
* ForkConnected { // cancellation connected to parent
* onCancel({ never<Unit>() }) {
* println("I got cancelled by my parent")
* }
* }
* }
* sleep(1.seconds)
* parent.cancel()
* }
* ```
*
* You can [Fiber.join] or [Fiber.cancel] the computation.
* Cancelling this [Fiber] **will not** cancel its parent.
Expand All @@ -53,6 +69,7 @@ suspend fun <A> ForkConnected(ctx: CoroutineContext = ComputationPool, f: suspen
Fiber(promise, conn2)
}

/** @see ForkConnected **/
suspend fun <A> (suspend () -> A).forkConnected(ctx: CoroutineContext = ComputationPool): Fiber<A> =
ForkConnected(ctx, this)

Expand Down Expand Up @@ -107,6 +124,7 @@ suspend fun <A> ForkScoped(
Fiber(promise, conn2)
}

/** @see ForkScoped */
suspend fun <A> (suspend () -> A).forkScoped(
ctx: CoroutineContext = ComputationPool,
interruptWhen: suspend () -> Unit
Expand All @@ -117,17 +135,18 @@ suspend fun <A> (suspend () -> A).forkScoped(
* You can [Fiber.join] or [Fiber.cancel] the computation.
*
* **BEWARE** you immediately leak the [Fiber] when launching without connection control.
* Use [ForkConnected] or safely launch the fiber as a [Resource] or using [Fiber].
* Use [ForkConnected] or safely launch the fiber as a [Resource] or using [bracketCase].
*
* @see ForkConnected for a fork operation that wires cancellation to its parent in a safe way.
*/
suspend fun <A> ForkAndForget(ctx: CoroutineContext = ComputationPool, f: suspend () -> A): Fiber<A> =
f.forkAndForget(ctx)

/** @see ForkAndForget */
suspend fun <A> (suspend () -> A).forkAndForget(ctx: CoroutineContext = ComputationPool): Fiber<A> {
val promise = UnsafePromise<A>()
// A new SuspendConnection, because its cancellation is now decoupled from our current one.
val conn = SuspendConnection()
startCoroutineCancellable(CancellableContinuation(ctx, conn, promise::complete))
return Fiber(promise, conn)
}

suspend fun <A> ForkAndForget(ctx: CoroutineContext = ComputationPool, f: suspend () -> A): Fiber<A> =
f.forkAndForget(ctx)
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,23 @@ suspend fun <A, B> Iterable<A>.parTraverse(f: suspend (A) -> B): List<B> =
* Traverse this [Iterable] and and run all mappers [f] on [CoroutineContext].
* Cancelling this operation cancels all running tasks.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.*
*
* data class User(val id: Int)
*
* suspend fun main(): Unit {
* //sampleStart
* suspend fun getUserById(id: Int): User =
* User(id)
*
* val res = listOf(1, 2, 3)
* .parTraverse(ComputationPool, ::getUserById)
* //sampleEnd
* println(res)
* }
* ```
*
* **WARNING** it runs in parallel depending on the capabilities of the provided [CoroutineContext].
* We ensure they start in sequence so it's guaranteed to finish on a single threaded context.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,27 @@ import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
* Parallel maps [fa], [fb] in parallel on [ComputationPool].
* Cancelling this operation cancels both operations running in parallel.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.*
*
* suspend fun main(): Unit {
* //sampleStart
* val result = parMapN(
* { "First one is on ${Thread.currentThread().name}" },
* { "Second one is on ${Thread.currentThread().name}" }
* ) { (a, b) ->
* "$a\n$b"
* }
* //sampleEnd
* println(result)
* }
* ```
*
* @param fa value to parallel map
* @param fb value to parallel map
* @param f function to map/combine value [A] and [B]
* ```
*
* @see parMapN for the same function that can race on any [CoroutineContext].
*/
suspend fun <A, B, C> parMapN(fa: suspend () -> A, fb: suspend () -> B, f: (Pair<A, B>) -> C): C =
Expand Down
Loading