Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Enviroment #214

Merged
merged 12 commits into from
Jul 8, 2020
156 changes: 39 additions & 117 deletions arrow-docs/docs/fx/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ permalink: /fx/

# Arrow Fx. Typed FP for the masses

Arrow Fx is a next-generation Typed FP Effects Library that makes effectful and polymorphic programming first class in Kotlin, and acts as an extension to the Kotlin native suspend system.
Arrow Fx is a next-generation Typed FP Effects Library that makes tracked effectful programming first class in Kotlin,
JorgeCastilloPrz marked this conversation as resolved.
Show resolved Hide resolved
and is build on top of Kotlin's suspend system.

The library brings purity, referential transparency, and direct imperative syntax to typed FP in Kotlin, and is a fun and easy tool for creating Typed Pure Functional Programs.

Expand All @@ -16,7 +17,8 @@ Arrow Fx programs run unmodified in multiple supported frameworks and runtimes s

## Pure & Referentially Transparent Functions

A pure function is a function that consistently returns the same output when given the same input. Pure functions exhibit a deterministic behavior and cause no observable effects externally. We call this property referential transparency.
A pure function is a function that consistently returns the same output when given the same input.
Pure functions exhibit a deterministic behavior and cause no observable effects externally. We call this property referential transparency.

Referential transparency allows us to reason about the different pieces of our program in isolation.

Expand All @@ -26,6 +28,7 @@ To create a pure function in Kotlin, let's use the keyword `fun`:
//sampleStart
fun helloWorld(): String =
"Hello World"

//sampleEnd
fun main() {
println(helloWorld())
Expand Down Expand Up @@ -54,7 +57,9 @@ fun helloWorld(): String =
suspend fun sayHello(): Unit =
println(helloWorld())

sayHello()
fun main() {
sayHello()
}
//sampleEnd
```

Expand Down Expand Up @@ -87,158 +92,75 @@ suspend fun greet(): Unit {
}
```

#### `fx` composition

Side effects can be composed and turned into pure values in `fx` blocks.

#### Turning side effects into pure values with `effect`

`effect` wraps the effect and turns it into a pure value by lifting any `suspend () -> A` user-declared side effect into an `IO<A>` value.

```kotlin:ank:playground
import arrow.fx.IO
import arrow.fx.extensions.fx
//sampleStart
suspend fun sayHello(): Unit =
println("Hello World")

suspend fun sayGoodBye(): Unit =
println("Good bye World!")

fun greet(): IO<Nothing, Unit> =
IO.fx<Unit> {
val pureHello = effect { sayHello() }
val pureGoodBye = effect { sayGoodBye() }
}
//sampleEnd
fun main() {
println(greet())
}
```
## Executing effectful programs

When we capture suspended side effects as `IO` values with `effect`, we can pass them around and compose them until we are ready to apply the effects. At this point, nothing has happened because `greet` and `effect` are lazy values.
The `greet` program is ready to run as soon as the user is ready to commit to an execution strategy that is either `blocking` or `non-blocking`.
`blocking` execution strategies will block the current thread that's waiting for the program to yield a value, whereas `non-blocking` strategies will immediately return and perform the program's work without blocking the current thread.
nomisRev marked this conversation as resolved.
Show resolved Hide resolved

#### Applying side effects with `!effect`
Since both blocking and non-blocking execution scenarios perform side effects, we consider running effects as an `unsafe` operation.
JorgeCastilloPrz marked this conversation as resolved.
Show resolved Hide resolved

We apply side effects with the operator `!`. Once you purify a side effect with `effect`, you can extract its value in a non-blocking way. `!effect` takes the suspended side effects `sayHello()` and `sayGoodbye()` and ensures the continuation context controls them before they get a chance to be executed. This ensures our effect compositions are pure and referentially transparent, and will only run at the edge.
Arrow restricts the ability to run programs to extensions of the `Enviroment` type class.
nomisRev marked this conversation as resolved.
Show resolved Hide resolved

Note that running `greet()` in the previous example does not perform any effects because it returns a wrapped lazy value. Since invoking this function does not produce effects, we can be confident that `greet` is pure and referentially transparent, despite referring to the effects application.
Usage of `unsafe` is reserved for the end of the world and may be the only impure execution of a well-typed functional program.
nomisRev marked this conversation as resolved.
Show resolved Hide resolved

```kotlin:ank:playground
import arrow.fx.IO
import arrow.fx.extensions.fx
import arrow.fx.coroutines.*

//sampleStart
suspend fun sayHello(): Unit =
println("Hello World")

suspend fun sayGoodBye(): Unit =
println("Good bye World!")

fun greet(): IO<Nothing, Unit> =
IO.fx<Unit> {
!effect { sayHello() }
!effect { sayGoodBye() }
}
//sampleEnd
fun main() {
println(greet()) //greet is a pure IO program
suspend fun greet(): Unit {
sayHello()
sayGoodBye()
}
```

An attempt to run a side effect in an `fx` block not delimited by `effect` or `!effect`:

```kotlin
import arrow.fx.IO
import arrow.fx.extensions.fx
//sampleStart
suspend fun sayHello(): Unit =
println("Hello World")

suspend fun sayGoodBye(): Unit =
println("Good bye World!")

fun greet(): IO<Nothing, Unit> =
IO.fx<Unit> {
sayHello()
sayGoodBye()
}
fun main() { // The edge of our world
val env = Enviroment()
env.unsafeRunSync { greet() }
}
//sampleEnd
```

it also results in `javax.script.ScriptException: error: restricted suspending functions can only invoke member or extension suspending functions on their restricted coroutine scope` compilation error for `sayHello` and `sayGoodBye` calls.

Arrow enforces usage to be explicit about effects application.

#### Applying existing datatypes

Composition using regular datatypes such as `IO` is still possible within `fx` blocks just like `effect` blocks. In addition to `!`, you can also use the extension function `bind()` to execute them.
In cases where you can use `suspend` edge-points, you should always prefer to do so. I.e. Kotlin also offers a `suspend fun main`.

```kotlin:ank:playground
import arrow.fx.IO
import arrow.fx.extensions.fx
//sampleStart
fun sayInIO(s: String): IO<Nothing, Unit> =
IO { println(s) }

fun greet(): IO<Nothing, Unit> =
IO.fx<Unit> {
sayInIO("Hello World").bind()
}
//sampleEnd
fun main() {
println(greet()) //greet is a pure IO program
}
```

## Executing effectful programs

The `greet` program is ready to run as soon as the user is ready to commit to an execution strategy that is either `blocking` or `non-blocking`.
`blocking` execution strategies will block the current thread that's waiting for the program to yield a value, whereas `non-blocking` strategies will immediately return and perform the program's work without blocking the current thread.

Since both blocking and non-blocking execution scenarios perform side effects, we consider running effects as an `unsafe` operation.

Arrow restricts the ability to run programs to extensions of the `UnsafeRun` type class.
import arrow.fx.coroutines.*

Usage of `unsafe` is reserved for the end of the world and may be the only impure execution of a well-typed functional program.

```kotlin:ank:playground
import arrow.fx.IO
import arrow.unsafe
import arrow.fx.extensions.runBlocking
import arrow.fx.extensions.fx
//sampleStart
suspend fun sayHello(): Unit =
println("Hello World")

suspend fun sayGoodBye(): Unit =
println("Good bye World!")

fun greet(): IO<Nothing, Unit> =
IO.fx<Unit> {
!effect { sayHello() }
!effect { sayGoodBye() }
}

fun main() { // The edge of our world
unsafe { runBlocking { greet() } }
suspend fun greet(): Unit {
sayHello()
sayGoodBye()
}

//sampleStart
suspend fun main(): Unit = // The edge of our world
greet()
//sampleEnd
```

Arrow Fx emphasizes the guarantee that users understand when they are performing side effects in their program declaration.

Arrow Fx programs are not restricted to `IO` but, in fact, are polymorphic and would work unmodified in many useful runtimes like the ones we find in popular libraries such as KotlinX Coroutines `Deferred`, Rx2 `Observable`, Reactor framework `Flux`, and, in general, any third party data type that can model sync and async effect suspension. See [Issue 1281](https://github.com/arrow-kt/arrow/issues/1281), which tracks support for those frameworks, or reach out to us if you are interested in support for any other framework.

If you're not very familiar with Functional Programming, and you've made it this far, you may realize that, despite the buzzwords and some FP jargon, you already know how to use Arrow Fx in general. This is because Arrow Fx brings the most popular imperative style to effectful programs with few simple primitives for effect control and asynchronous programming.
Arrow Fx emphasizes the guarantee that users understand when they are performing side effects in their program declaration.

If you're not very familiar with Functional Programming, and you've made it this far, you may realize that, despite the buzzwords and some FP jargon, you already know how to use Arrow Fx in general.
This is because Arrow Fx brings the most popular imperative style to effectful programs with few simple primitives for effect control and asynchronous programming.

# Conclusion

Complementing the Kotlin Coroutines library, Arrow Fx adds an extra layer of safety to concurrent and asynchronous programming so you're well aware of where effects are localized in your apps.
It does this by empowering polymorphic programs that can be interpreted untouched, preserving the same declaration in multiple popular frameworks such as Rx2, Reactor, etc.
Arrow Fx offers an idiomatic way of doing effecfull FP with Kotlin's coroutine system.
It does this by providing support for cancellation, error handling, resource handling and all goodies you might be familiar with from other functional effect systems.

Arrow Fx offers direct style syntax and effect control without compromises, and removes the syntactic burden of type parametrization while still yielding programs that are pure, safe, and referentially transparent.

Despite some limitations of the Kotlin type system, like lack of Higher Kinded Types, the Kotlin language is excellent for encoding typed programs with a first-class imperative syntax for FP. These programs are approachable by the broader programming community, and arguably easier to encode than some solutions currently used in mainstream Kotlin and Scala FP communities.
Despite some limitations of the Kotlin type system, like lack of Higher Kinded Types, the Kotlin language is excellent for encoding typed programs with a first-class imperative syntax for FP.
These programs are approachable by the broader programming community, and arguably easier to encode than some solutions currently used in mainstream Kotlin and Scala FP communities.

Come check out Arrow fx and join the discussion in the [Kotlin Slack](https://kotlinlang.slack.com/messages/C5UPMM0A0) and [Gitter](https://gitter.im/arrow-kt/Lobby)
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package arrow.fx.coroutines

import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.startCoroutine

/**
* Environment that can run [suspend] programs using [startCoroutine] and [startCoroutineCancellable].
*
* An [Environment] runs on a certain [CoroutineContext] which is used to start the programs on.
* Since coroutines always return where they were started, this [CoroutineContext] also defines where you return after operators like [sleep] or [evalOn].
* Therefore it's advised to always run on [ComputationPool] which is the default setting.
JorgeCastilloPrz marked this conversation as resolved.
Show resolved Hide resolved
*
* [Environment] also has an [asyncErrorHandler] which by default redirects to [Throwable.printStackTrace],
* no user flow errors will ever be send here [CancelToken] exceptions might bubble up here when they cannot be redirect to the user.
*
* This [Environment] is meant to be used in Java frameworks, or frameworks that do not expose [suspend] edge-points.
*
* This contract could be elaborated on Android to provide automatic cancellation on Android LifecycleOwner.
*/
interface Environment {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the implementation. Can't wait to use it! 😃


/**
* Start [CoroutineContext] of the programs ran using [startCoroutine] and [startCoroutineCancellable].
*/
val ctx: CoroutineContext

/**
* The async error handler is reserved for rare exceptions.
* When an error occurs after the completion of a [kotlin.coroutines.Continuation],
* or when an error occurs when triggering a [CancelToken].
*
* This will never be invoked for a [Throwable] that occurred in a user flow.
*/
fun asyncErrorHandler(e: Throwable): Unit

/**
* Execution strategy that will block the current thread that's waiting for the program to yield a value.
*/
fun <A> unsafeRunSync(fa: suspend () -> A): A

/**
* Execution strategies that will immediately return and perform the program's work without blocking the current thread.
* This operation runs uncancellable.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth adding a note about how exceptions are handled in this particular case.

*/
fun unsafeRunAsync(fa: suspend () -> Unit): Unit =
unsafeRunAsync(fa, { throw it }, { /* Finished normally */ })

/**
* Execution strategies that will immediately return and perform the program's work without blocking the current thread.
* This operation runs uncancellable.
*/
fun <A> unsafeRunAsync(fa: suspend () -> A, e: (Throwable) -> Unit, a: (A) -> Unit): Unit

/**
* Execution strategies that will immediately return and perform the program's work without blocking the current thread.
* Runs the operation [fa] in a cancellable way.
* Operation can be cancelled by invoking the returned [Disposable].
*/
fun unsafeRunAsyncCancellable(fa: suspend () -> Unit): Disposable =
unsafeRunAsyncCancellable(fa, { throw it }, { /* Finished normally */ })

fun <A> unsafeRunAsyncCancellable(fa: suspend () -> A, e: (Throwable) -> Unit, a: (A) -> Unit): Disposable

companion object {
operator fun invoke(ctx: CoroutineContext = ComputationPool): Environment =
DefaultEnvironment(ctx)
}
}

internal class DefaultEnvironment(override val ctx: CoroutineContext) : Environment {

override fun asyncErrorHandler(e: Throwable) =
e.printStackTrace()

override fun <A> unsafeRunSync(fa: suspend () -> A): A =
Platform.unsafeRunSync(ctx, fa)

override fun <A> unsafeRunAsync(fa: suspend () -> A, e: (Throwable) -> Unit, a: (A) -> Unit): Unit =
fa.startCoroutine(Continuation(ctx) { res -> res.fold(a, e) })

override fun <A> unsafeRunAsyncCancellable(fa: suspend () -> A, e: (Throwable) -> Unit, a: (A) -> Unit): Disposable =
fa.startCoroutineCancellable(CancellableContinuation(ctx) { res ->
res.fold(a, e) // Return error to caller
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,16 @@ import java.util.concurrent.TimeoutException
import kotlin.coroutines.CoroutineContext
import kotlin.random.Random

/*inline*/ class Stream<out O> internal constructor(internal val asPull: Pull<O, Unit>) {
class ForStream private constructor() {
companion object
}

typealias StreamOf<A> = arrow.Kind<ForStream, A>

inline fun <A> StreamOf<A>.fix(): Stream<A> =
this as Stream<A>

/*inline*/ class Stream<out O> internal constructor(internal val asPull: Pull<O, Unit>) : StreamOf<O> {

/**
* Creates a stream whose elements are generated by applying [f] to each output of
Expand Down
Loading