Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Racing DSL #3411

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,21 @@ public final class arrow/fx/coroutines/Race3Kt {
public static synthetic fun raceN$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class arrow/fx/coroutines/RacingKt {
public static final fun racing (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun racing$default (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public abstract interface class arrow/fx/coroutines/RacingScope {
public abstract fun race (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun raceOrFail (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class arrow/fx/coroutines/RacingScope$DefaultImpls {
public static synthetic fun race$default (Larrow/fx/coroutines/RacingScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun raceOrFail$default (Larrow/fx/coroutines/RacingScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public abstract interface annotation class arrow/fx/coroutines/ResourceDSL : java/lang/annotation/Annotation {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package arrow.fx.coroutines

import arrow.atomic.Atomic
import arrow.atomic.update
import arrow.atomic.value
import arrow.core.identity
import arrow.core.nonFatalOrThrow
import arrow.core.prependTo
import arrow.core.raise.DelicateRaiseApi
import arrow.core.raise.RaiseCancellationException
import kotlinx.coroutines.*
import kotlinx.coroutines.selects.SelectBuilder
import kotlinx.coroutines.selects.select
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.cancellation.CancellationException

/**
* A DSL that allows racing many `suspend` functions in parallel against each-other,
* it yields a final result of [A] based on the first function that yields a result.
* A racer can yield a result based on [RacingScope.race], or [RacingScope.raceOrFail].
*
* [RacingScope.race] will call [handleException] in case of an exception,
* and then await **another successful result** but not cancel the race. Whilst [RacingScope.raceOrFail] will cancel the race,
* and rethrow the exception that occurred and thus cancel the race and all participating racers.
*
* ```kotlin
* suspend fun winner(): String = racing {
* race { delay(1000); "Winner" }
* race { throw RuntimeException("Loser") }
* } // Winner (logged RuntimeException)
*
* suspend fun winner(): String = racing {
* race { delay(1000); "loser" }
* raceOrFail { throw RuntimeException("Loser") }
* } // RuntimeException
* ```
*
* **Important:** a racing program with no racers will hang forever.
* ```kotlin
* suspend fun never(): Nothing = racing { }
* ```
*
* @param handleException handle any exception that occurred in [RacingScope.race],
* by default it [Throwable.printStackTrace].
* @param block the body of the DSL that describes the racing logic
* @return the winning value of [A].
*/
public suspend fun <A> racing(
handleException: ((context: CoroutineContext, exception: Throwable) -> Unit)? = null,
block: RacingScope<A>.() -> Unit,
): A = coroutineScope {
val exceptionHandler = handleException ?: defaultExceptionHandler()::handleException
select {
val scope = SelectRacingScope(this@select, this@coroutineScope, exceptionHandler)
block(scope)
// TODO add this check??
require(scope.racers.value.isNotEmpty()) { "A racing program with no racers can never yield a result." }
}
}

public interface RacingScope<A> {
public suspend fun race(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> A
)

public suspend fun raceOrFail(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> A
)
}

private class SelectRacingScope<A>(
private val select: SelectBuilder<A>,
private val scope: CoroutineScope,
nomisRev marked this conversation as resolved.
Show resolved Hide resolved
private val handleException: (context: CoroutineContext, exception: Throwable) -> Unit
) : RacingScope<A>, CoroutineScope by scope {
val racers: Atomic<List<Deferred<A>>> = Atomic(emptyList())

override suspend fun raceOrFail(
context: CoroutineContext,
block: suspend CoroutineScope.() -> A
) {
/* First we create a lazy racer,
* and we add it in front of the existing racers such that we maintain correct order.
* After we've successfully registered the racer, we check for race conditions,
* and 'start' racing.
*/
val racer = scope.async(
nomisRev marked this conversation as resolved.
Show resolved Hide resolved
start = CoroutineStart.LAZY,
context = context,
block = block
)
racers.update { racer prependTo it }
if (isActive) {
require(racer.start()) { "Racer not started" }
return with(select) {
racer.onAwait.invoke(::identity)
}
}
}

// To not fail the entire race, we allow users to handle the exceptions but do not cancel the race.
@OptIn(DelicateRaiseApi::class)
override suspend fun race(context: CoroutineContext, block: suspend CoroutineScope.() -> A) =
raceOrFail {
try {
block()
} catch (e: RaiseCancellationException) {
// `Raise<E>` error is ignored... Can we do better here?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe raise shouldn't be considered a failure? Perhaps raising in such a way is conceptually the same as returning a value. Alternatively, we can have a raceOrRaise

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe raise shouldn't be considered a failure? Perhaps raising in such a way is conceptually the same as returning a value. Alternatively, we can have a raceOrRaise

Hmm. That kind-of makes sense, but I'd still expect user to want to 'select' E and not A but supporting both would be great.

So how do you imagine the API:

  • race (first A)
  • raceOrRaise ( first A or E)
  • raceOrThrow (first A, E or Throwable)

?? Do we need something that 'selects' first A or Throwable?

Potentially we can do a different extension when you reside within Raise, so an extension Raise<E>.racing(...) but then it's probably still possible to easily select the non-Raise variant 🤔.

suspend fun Raise<E>.racing(...): A
suspend fun racing(..): A

This probably doesn't yield the benefit we'd want since the non-Raise racing would get suggested when Raise doesn't match or Raise.racing has an additional param.

awaitCancellation()
} catch (e: Throwable) {
handleException(currentCoroutineContext(), e.nonFatalOrThrow())
awaitCancellation()
}
}
}

private suspend fun defaultExceptionHandler(): CoroutineExceptionHandler =
currentCoroutineContext()[CoroutineExceptionHandler] ?: DefaultCoroutineExceptionHandler

private object DefaultCoroutineExceptionHandler : CoroutineExceptionHandler {
override val key: CoroutineContext.Key<CoroutineExceptionHandler> = CoroutineExceptionHandler

override fun handleException(context: CoroutineContext, exception: Throwable) {
if (exception !is CancellationException) exception.printStackTrace()
}
}