Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Update Arrow "cancel" wording for consistency #82

Merged
merged 6 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ open class Cancellable {
@Param("100")
var size: Int = 0

fun evalCancelable(n: Int): IO<Int> =
IO.concurrent().cancelable<Int> { cb ->
fun evalCancellable(n: Int): IO<Int> =
IO.concurrent().cancellable<Int> { cb ->
cb(Right(n))
IO.unit
}.fix()

fun cancelableLoop(i: Int): IO<Int> =
if (i < size) evalCancelable(i + 1).flatMap { cancelableLoop(it) }
else evalCancelable(i)
fun cancellableLoop(i: Int): IO<Int> =
if (i < size) evalCancellable(i + 1).flatMap { cancellableLoop(it) }
else evalCancellable(i)

@Benchmark
fun io(): Int =
cancelableLoop(0).unsafeRunSync()
cancellableLoop(0).unsafeRunSync()
}
6 changes: 3 additions & 3 deletions arrow-benchmarks-fx/src/jmh/kotlin/arrow/benchmarks/Queue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import arrow.fx.extensions.io.concurrent.concurrent
import arrow.fx.extensions.io.functor.unit
import arrow.fx.extensions.io.monad.flatMap
import arrow.fx.fix
import arrow.fx.internal.CancelableQueue
import arrow.fx.internal.CancellableQueue
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.CompilerControl
import org.openjdk.jmh.annotations.Fork
Expand Down Expand Up @@ -39,7 +39,7 @@ open class Queue {
@Setup(Level.Trial)
fun createQueues(): Unit {
ConcurQueue = ConcurrentQueue.empty<ForIO, Int>(IO.concurrent()).fix().unsafeRunSync()
CancelQueue = CancelableQueue.empty<ForIO, Int>(IO.concurrent()).fix().unsafeRunSync()
CancelQueue = CancellableQueue.empty<ForIO, Int>(IO.concurrent()).fix().unsafeRunSync()
}

fun <A> IOOf<A>.repeat(n: Int): IO<A> =
Expand All @@ -54,5 +54,5 @@ open class Queue {
fun concurrentQueue(): Unit = loop(ConcurQueue)

@Benchmark
fun cancelableQueue(): Unit = loop(CancelQueue)
fun cancellableQueue(): Unit = loop(CancelQueue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ open class Uncancellable {
@Param("100")
var size: Int = 0

fun ioUncancelableLoop(i: Int): IO<Int> =
if (i < size) IO { i + 1 }.uncancelable().flatMap { ioUncancelableLoop(it) }
fun ioUncancellableLoop(i: Int): IO<Int> =
if (i < size) IO { i + 1 }.uncancellable().flatMap { ioUncancellableLoop(it) }
else IO.just(i)

@Benchmark
fun io(): Int = ioUncancelableLoop(0).unsafeRunSync()
fun io(): Int = ioUncancellableLoop(0).unsafeRunSync()
}
4 changes: 2 additions & 2 deletions arrow-docs/docs/fiber/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ permalink: /effects/fiber/


A `Fiber` is a concurrency primitive for describing parallel operations or multi-tasking.
Concurrently started tasks can either be joined or canceled, and these are the only two operators available on `Fiber`.
Concurrently started tasks can either be joined or cancelled, and these are the only two operators available on `Fiber`.

Using `Fiber`, we can describe parallel operations such as `parallelMap` relatively easily.
**Note** the operation written below does not support proper cancellation.
When the resulting `IO` is canceled, it does not propagate this cancellation back to the underlying `IO`.
When the resulting `IO` is cancelled, it does not propagate this cancellation back to the underlying `IO`.

```kotlin:ank
import arrow.fx.*
Expand Down
6 changes: 3 additions & 3 deletions arrow-docs/docs/fx/typeclasses/bracket/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ println(safeComputation)
#### bracketCase

It's a generalized version of `bracket()` that uses `ExitCase` to distinguish between different exit cases when
releasing the acquired resource. `ExitCase` can take the values `Completed`, `Canceled`, or `Error(e)`. So, depending
releasing the acquired resource. `ExitCase` can take the values `Completed`, `Cancelled`, or `Error(e)`. So, depending
how the `use` execution finalizes, the corresponding `ExitCase` value will be passed to the `release` lambda.

It requires passing `release` and `use` lambdas. It ensures acquiring, using, and releasing the resource at the end.
Expand Down Expand Up @@ -282,7 +282,7 @@ val safeComputation = openFile("data.json").bracketCase(
release = { file, exitCase ->
when (exitCase) {
is ExitCase.Completed -> { /* do something */ }
is ExitCase.Canceled -> { /* do something */ }
is ExitCase.Cancelled -> { /* do something */ }
is ExitCase.Error -> { /* do something */ }
}
closeFile(file)
Expand All @@ -297,7 +297,7 @@ println(safeComputation)

#### guarantee/guaranteeCase - onCancel/onError

Ignores the acquisition and focuses on using a resource and performing an action whenever it finishes in any way (completed, error, canceled).
Ignores the acquisition and focuses on using a resource and performing an action whenever it finishes in any way (completed, error, cancelled).

Similarly as for `bracketCase`, `guaranteeCase` works in the same way as `guarantee` but uses `ExitCase` to distinguish between different exit cases when
releasing the acquired resource.
Expand Down
2 changes: 1 addition & 1 deletion arrow-docs/docs/integrations/kotlinxcoroutines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ If you'd like to introduce `IO` in your project, you might want to keep using th

### unsafeRunScoped & unsafeRunIO

`IO.unsafeRunScoped(scope, cb)` runs the specific `IO` with a [CoroutineScope](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html), so it will be automatically canceled when the scope does as well.
`IO.unsafeRunScoped(scope, cb)` runs the specific `IO` with a [CoroutineScope](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html), so it will be automatically cancelled when the scope does as well.

Similarly, there's `scope.unsafeRunIO(IO, cb)`, which works in the same way with different syntax:

Expand Down
20 changes: 10 additions & 10 deletions arrow-docs/docs/promise/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ A `Promise` guarantees (promises) `A` at some point in the future within the con

## Constructing a Promise

A promise can easily be made by calling `uncancelable`.
A promise can easily be made by calling `uncancellable`.
Since the allocation of mutable state is not referentially transparent, this side-effect is contained within `F`.

```kotlin:ank:playground
Expand All @@ -24,21 +24,21 @@ import arrow.fx.extensions.io.async.async
fun main(args: Array<String>) {
//sampleStart
val promise: IO<Promise<ForIO, Int>> =
Promise.uncancelable<ForIO, Int>(IO.async()).fix()
Promise.uncancellable<ForIO, Int>(IO.async()).fix()
//sampleEnd
println(promise)
}
```

In case you want the side-effect to execute immediately and return the `Promise` instance, you can use the `unsafeUncancelable` function.
In case you want the side-effect to execute immediately and return the `Promise` instance, you can use the `unsafeUncancellable` function.

```kotlin:ank:playground
import arrow.fx.*
import arrow.fx.extensions.io.async.async

fun main(args: Array<String>) {
//sampleStart
val unsafePromise: Promise<ForIO, Int> = Promise.unsafeUncancelable(IO.async())
val unsafePromise: Promise<ForIO, Int> = Promise.unsafeUncancellable(IO.async())
//sampleEnd
println(unsafePromise)
}
Expand All @@ -55,7 +55,7 @@ import arrow.fx.extensions.io.monad.flatMap

fun main(args: Array<String>) {
//sampleStart
Promise.uncancelable<ForIO, Int>(IO.async()).flatMap { p ->
Promise.uncancellable<ForIO, Int>(IO.async()).flatMap { p ->
p.get()
} //never ends because `get` keeps waiting for p to be fulfilled.
//sampleEnd
Expand All @@ -69,7 +69,7 @@ import arrow.fx.extensions.io.monad.flatMap

fun main(args: Array<String>) {
//sampleStart
val result = Promise.uncancelable<ForIO, Int>(IO.async()).flatMap { p ->
val result = Promise.uncancellable<ForIO, Int>(IO.async()).flatMap { p ->
p.complete(1).flatMap {
p.get()
}
Expand All @@ -90,7 +90,7 @@ import arrow.fx.extensions.io.monad.flatMap

fun main(args: Array<String>) {
//sampleStart
val result = Promise.uncancelable<ForIO, Int>(IO.async()).flatMap { p ->
val result = Promise.uncancellable<ForIO, Int>(IO.async()).flatMap { p ->
p.complete(2).flatMap {
p.get()
}
Expand All @@ -107,7 +107,7 @@ import arrow.fx.extensions.io.monad.flatMap

fun main(args: Array<String>) {
//sampleStart
val result = Promise.uncancelable<ForIO, Int>(IO.async()).flatMap { p ->
val result = Promise.uncancellable<ForIO, Int>(IO.async()).flatMap { p ->
p.complete(1).flatMap {
p.complete(2)
}
Expand All @@ -130,7 +130,7 @@ import arrow.fx.extensions.io.monad.flatMap

fun main(args: Array<String>) {
//sampleStart
val result = Promise.uncancelable<ForIO, Int>(IO.async()).flatMap { p ->
val result = Promise.uncancellable<ForIO, Int>(IO.async()).flatMap { p ->
p.error(RuntimeException("Break promise"))
}
.attempt()
Expand All @@ -147,7 +147,7 @@ import arrow.fx.extensions.io.monad.flatMap

fun main(args: Array<String>) {
//sampleStart
val result = Promise.uncancelable<ForIO, Int>(IO.async()).flatMap { p ->
val result = Promise.uncancellable<ForIO, Int>(IO.async()).flatMap { p ->
p.complete(1).flatMap {
p.error(RuntimeException("Break promise"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class CoroutinesIntegrationTest : UnitSpec() {
val promise = !Promise<Int>()
!effect {
scope.launch {
IO.cancelable<Unit> { promise.complete(i) }.suspendCancellable()
IO.cancellable<Unit> { promise.complete(i) }.suspendCancellable()
}
}
!effect { scope.cancel() }
Expand Down Expand Up @@ -135,7 +135,7 @@ class CoroutinesIntegrationTest : UnitSpec() {
val scope = TestCoroutineScope(Job() + TestCoroutineDispatcher())
val promise = !Promise<Int>()
!effect {
IO.cancelable<Unit> { promise.complete(i) }.unsafeRunScoped(scope) { }
IO.cancellable<Unit> { promise.complete(i) }.unsafeRunScoped(scope) { }
}
!effect { scope.cancel() }
!promise.get()
Expand Down
22 changes: 15 additions & 7 deletions arrow-fx-reactor/src/main/kotlin/arrow/fx/reactor/FluxK.kt
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ data class FluxK<out A>(val flux: Flux<out A>) : FluxKOf<A> {
* release = { file, exitCase ->
* when (exitCase) {
* is ExitCase.Completed -> { /* do something */ }
* is ExitCase.Canceled -> { /* do something */ }
* is ExitCase.Cancelled -> { /* do something */ }
* is ExitCase.Error -> { /* do something */ }
* }
* closeFile(file)
Expand All @@ -94,13 +94,13 @@ data class FluxK<out A>(val flux: Flux<out A>) : FluxKOf<A> {
fun <B> bracketCase(use: (A) -> FluxKOf<B>, release: (A, ExitCase<Throwable>) -> FluxKOf<Unit>): FluxK<B> =
FluxK(Flux.create<B> { sink ->
flux.subscribe({ a ->
if (sink.isCancelled) release(a, ExitCase.Canceled).fix().flux.subscribe({}, sink::error)
if (sink.isCancelled) release(a, ExitCase.Cancelled).fix().flux.subscribe({}, sink::error)
else try {
sink.onDispose(use(a).fix()
.flatMap { b -> release(a, ExitCase.Completed).fix().map { b } }
.handleErrorWith { e -> release(a, ExitCase.Error(e)).fix().flatMap { FluxK.raiseError<B>(e) } }
.flux
.doOnCancel { release(a, ExitCase.Canceled).fix().flux.subscribe({}, sink::error) }
.doOnCancel { release(a, ExitCase.Cancelled).fix().flux.subscribe({}, sink::error) }
.subscribe({ sink.next(it) }, sink::error, { }, {
sink.onRequest(it::request)
})
Expand Down Expand Up @@ -208,8 +208,8 @@ data class FluxK<out A>(val flux: Flux<out A>) : FluxKOf<A> {
* ```
*/
@Deprecated(message =
"For wrapping cancelable operations you should use cancelable instead.\n" +
"For wrapping uncancelable operations you can use the non-deprecated async")
"For wrapping cancellable operations you should use cancellable instead.\n" +
"For wrapping uncancellable operations you can use the non-deprecated async")
fun <A> async(fa: FluxKProc<A>): FluxK<A> =
Flux.create<A> { sink ->
val conn = FluxKConnection()
Expand Down Expand Up @@ -241,8 +241,8 @@ data class FluxK<out A>(val flux: Flux<out A>) : FluxKOf<A> {
}.k()

@Deprecated(message =
"For wrapping cancelable operations you should use cancelableF instead.\n" +
"For wrapping uncancelable operations you can use the non-deprecated asyncF")
"For wrapping cancellable operations you should use cancellableF instead.\n" +
"For wrapping uncancellable operations you can use the non-deprecated asyncF")
fun <A> asyncF(fa: FluxKProcF<A>): FluxK<A> =
Flux.create { sink: FluxSink<A> ->
val conn = FluxKConnection()
Expand Down Expand Up @@ -273,7 +273,11 @@ data class FluxK<out A>(val flux: Flux<out A>) : FluxKOf<A> {
}.fix().flux.subscribe({}, sink::error)
}.k()

@Deprecated("Renaming this api for consistency", ReplaceWith("cancellable(fa)"))
fun <A> cancelable(fa: ((Either<Throwable, A>) -> Unit) -> CancelToken<ForFluxK>): FluxK<A> =
cancellable(fa)

fun <A> cancellable(fa: ((Either<Throwable, A>) -> Unit) -> CancelToken<ForFluxK>): FluxK<A> =
Flux.create<A> { sink ->
val token = fa { either: Either<Throwable, A> ->
either.fold({ e ->
Expand All @@ -286,7 +290,11 @@ data class FluxK<out A>(val flux: Flux<out A>) : FluxKOf<A> {
sink.onDispose { token.value().subscribe({}, sink::error) }
}.k()

@Deprecated("Renaming this api for consistency", ReplaceWith("cancellableF(fa)"))
fun <A> cancelableF(fa: ((Either<Throwable, A>) -> Unit) -> FluxKOf<CancelToken<ForFluxK>>): FluxK<A> =
cancellableF(fa)

fun <A> cancellableF(fa: ((Either<Throwable, A>) -> Unit) -> FluxKOf<CancelToken<ForFluxK>>): FluxK<A> =
Flux.create<A> { sink ->
val cb = { either: Either<Throwable, A> ->
either.fold({ e ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import arrow.fx.KindConnection
import arrow.fx.typeclasses.ExitCase
import arrow.fx.typeclasses.MonadDefer

@Deprecated("Cancellation should be done with the cancelable combinator")
@Deprecated("Cancellation should be done with the cancellable combinator")
typealias FluxKProc<A> = (KindConnection<ForFluxK>, (Either<Throwable, A>) -> Unit) -> Unit
@Deprecated("Cancellation should be done with the cancelable combinator")
@Deprecated("Cancellation should be done with the cancellable combinator")
typealias FluxKProcF<A> = (KindConnection<ForFluxK>, (Either<Throwable, A>) -> Unit) -> FluxKOf<Unit>

/**
Expand All @@ -22,7 +22,7 @@ typealias FluxKProcF<A> = (KindConnection<ForFluxK>, (Either<Throwable, A>) -> U
*/
@Suppress("UNUSED_PARAMETER", "FunctionName")
@Deprecated(message = "Cancelling operations through FluxKConnection will not be supported anymore." +
"In case you need to cancel multiple processes can do so by using cancelable and composing cancel operations using zipWith or other parallel operators")
"In case you need to cancel multiple processes can do so by using cancellable and composing cancel operations using zipWith or other parallel operators")
fun FluxKConnection(dummy: Unit = Unit): KindConnection<ForFluxK> = KindConnection(object : MonadDefer<ForFluxK> {
override fun <A> defer(fa: () -> FluxKOf<A>): FluxK<A> =
FluxK.defer(fa)
Expand Down
Loading