diff --git a/arrow-libs/core/arrow-core-retrofit/src/test/kotlin/arrow/retrofit/adapter/either/networkhandling/NetworkEitherCallAdapterTest.kt b/arrow-libs/core/arrow-core-retrofit/src/test/kotlin/arrow/retrofit/adapter/either/networkhandling/NetworkEitherCallAdapterTest.kt index f7705dfa5a1..90788295ec9 100644 --- a/arrow-libs/core/arrow-core-retrofit/src/test/kotlin/arrow/retrofit/adapter/either/networkhandling/NetworkEitherCallAdapterTest.kt +++ b/arrow-libs/core/arrow-core-retrofit/src/test/kotlin/arrow/retrofit/adapter/either/networkhandling/NetworkEitherCallAdapterTest.kt @@ -20,7 +20,7 @@ import retrofit2.Converter import retrofit2.Retrofit import retrofit2.converter.gson.GsonConverterFactory import retrofit2.converter.moshi.MoshiConverterFactory -import java.net.ConnectException +import java.net.SocketException import java.net.SocketTimeoutException @ExperimentalSerializationApi @@ -96,7 +96,7 @@ private fun networkEitherCallAdapterTests( body.shouldBeInstanceOf>() .value.shouldBeInstanceOf() - .cause.shouldBeInstanceOf() + .cause.shouldBeInstanceOf() } "should return IOError when no response" { diff --git a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/Either.kt b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/Either.kt index 0d9d123a44d..7e47738d6bb 100644 --- a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/Either.kt +++ b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/Either.kt @@ -1311,8 +1311,7 @@ public fun Either<*, B>.orNull(): B? = orNull() /** - * Returns the value from this [Right] or allows clients to transform [Left] to [Right] while providing access to - * the value of [Left]. + * Returns the value from this [Right] or allows clients to transform the value from [Left] with the [default] lambda. * * Example: * ```kotlin diff --git a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/Ior.kt b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/Ior.kt index a2f6b55a503..9983e985121 100644 --- a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/Ior.kt +++ b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/Ior.kt @@ -23,7 +23,7 @@ public typealias IorNel = Ior, B> * [Ior]<`A`,`B`> is similar to [Either]<`A`,`B`>, except that it can represent the simultaneous presence of * an `A` and a `B`. It is right-biased so methods such as `map` and `flatMap` operate on the * `B` value. Some methods, like `flatMap`, handle the presence of two [Ior.Both] values using a - * `[Semigroup]<`A`>, while other methods, like [toEither], ignore the `A` value in a [Ior.Both Both]. + * [Semigroup]<`A`>, while other methods, like [toEither], ignore the `A` value in a [Ior.Both Both]. * * [Ior]<`A`,`B`> is isomorphic to [Either]<[Either]<`A`,`B`>, [Pair]<`A`,`B`>>, but provides methods biased toward `B` * values, regardless of whether the `B` values appear in a [Ior.Right] or a [Ior.Both]. diff --git a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/NonEmptyList.kt b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/NonEmptyList.kt index 80e58e68eb1..d823b9fc482 100644 --- a/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/NonEmptyList.kt +++ b/arrow-libs/core/arrow-core/src/commonMain/kotlin/arrow/core/NonEmptyList.kt @@ -237,10 +237,9 @@ public class NonEmptyList( @Deprecated( "Use toNonEmptyListOrNull instead", ReplaceWith( - "Option.fromNullable>(l.toNonEmptyListOrNull())", + "l.toNonEmptyListOrNull().toOption()", "import arrow.core.toNonEmptyListOrNull", - "import arrow.core.Option", - "import arrow.core.NonEmptyList" + "import arrow.core.toOption" ) ) @JvmStatic diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index fbc6e73af25..be169607705 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -81,7 +81,9 @@ public final class arrow/fx/coroutines/CircuitBreaker$State$Closed : arrow/fx/co public final class arrow/fx/coroutines/CircuitBreaker$State$HalfOpen : arrow/fx/coroutines/CircuitBreaker$State { public fun (D)V + public synthetic fun (JLkotlin/jvm/internal/DefaultConstructorMarker;)V public fun equals (Ljava/lang/Object;)Z + public final fun getResetTimeout-UwyO8pc ()J public final fun getResetTimeoutNanos ()D public fun hashCode ()I public fun toString ()Ljava/lang/String; @@ -89,8 +91,10 @@ public final class arrow/fx/coroutines/CircuitBreaker$State$HalfOpen : arrow/fx/ public final class arrow/fx/coroutines/CircuitBreaker$State$Open : arrow/fx/coroutines/CircuitBreaker$State { public fun (JD)V + public synthetic fun (JJLkotlin/jvm/internal/DefaultConstructorMarker;)V public fun equals (Ljava/lang/Object;)Z public final fun getExpiresAt ()J + public final fun getResetTimeout-UwyO8pc ()J public final fun getResetTimeoutNanos ()D public final fun getStartedAt ()J public fun hashCode ()I @@ -98,6 +102,7 @@ public final class arrow/fx/coroutines/CircuitBreaker$State$Open : arrow/fx/coro } public abstract class arrow/fx/coroutines/ExitCase { + public static final field Companion Larrow/fx/coroutines/ExitCase$Companion; } public final class arrow/fx/coroutines/ExitCase$Cancelled : arrow/fx/coroutines/ExitCase { @@ -111,6 +116,10 @@ public final class arrow/fx/coroutines/ExitCase$Cancelled : arrow/fx/coroutines/ public fun toString ()Ljava/lang/String; } +public final class arrow/fx/coroutines/ExitCase$Companion { + public final fun ExitCase (Ljava/lang/Throwable;)Larrow/fx/coroutines/ExitCase; +} + public final class arrow/fx/coroutines/ExitCase$Completed : arrow/fx/coroutines/ExitCase { public static final field INSTANCE Larrow/fx/coroutines/ExitCase$Completed; public fun toString ()Ljava/lang/String; @@ -328,6 +337,7 @@ public final class arrow/fx/coroutines/ResourceExtensionsKt { } public final class arrow/fx/coroutines/ResourceKt { + public static final fun allocated (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun asFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun resource (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;)Lkotlin/jvm/functions/Function2; public static final fun resource (Lkotlin/jvm/functions/Function2;)Lkotlin/jvm/functions/Function2; diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Bracket.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Bracket.kt index d92e4be3343..d9273f3ef95 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Bracket.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Bracket.kt @@ -13,6 +13,11 @@ public sealed class ExitCase { public data class Cancelled(val exception: CancellationException) : ExitCase() public data class Failure(val failure: Throwable) : ExitCase() + + public companion object { + public fun ExitCase(error: Throwable): ExitCase = + if (error is CancellationException) Cancelled(error) else Failure(error) + } } /** diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/CircuitBreaker.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/CircuitBreaker.kt index ccc8a6cd478..81c991cde98 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/CircuitBreaker.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/CircuitBreaker.kt @@ -8,8 +8,9 @@ import arrow.fx.coroutines.CircuitBreaker.State.HalfOpen import arrow.fx.coroutines.CircuitBreaker.State.Open import kotlinx.coroutines.CompletableDeferred import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.nanoseconds import kotlin.time.DurationUnit -import kotlin.time.ExperimentalTime /** * A [CircuitBreaker] is used to `protect` resources or services from being overloaded @@ -129,15 +130,18 @@ public class CircuitBreaker private constructor( private val state: AtomicRef, private val maxFailures: Int, - private val resetTimeout: Double, + private val resetTimeoutNanos: Double, private val exponentialBackoffFactor: Double, - private val maxResetTimeout: Double, + private val maxResetTimeoutNanos: Double, private val onRejected: suspend () -> Unit, private val onClosed: suspend () -> Unit, private val onHalfOpen: suspend () -> Unit, private val onOpen: suspend () -> Unit ) { - + + private val resetTimeout: Duration = resetTimeoutNanos.nanoseconds + private val maxResetTimeout: Duration = maxResetTimeoutNanos.nanoseconds + /** Returns the current [CircuitBreaker.State], meant for debugging purposes. */ public suspend fun state(): State = state.get() @@ -191,10 +195,10 @@ private constructor( // task to execute, while transitioning into HalfOpen if (!state.compareAndSet( curr, - State.HalfOpen(curr.resetTimeoutNanos, curr.awaitClose) + HalfOpen(curr.resetTimeout, curr.awaitClose) ) ) protectOrThrow(fa) // retry! - else attemptReset(fa, curr.resetTimeoutNanos, curr.awaitClose, curr.startedAt) + else attemptReset(fa, curr.resetTimeout, curr.awaitClose, curr.startedAt) } else { // Open isn't expired, so we need to fail val expiresInMillis = curr.expiresAt - now @@ -234,7 +238,7 @@ private constructor( if (curr.failures + 1 < maxFailures) { // It's fine, just increment the failures count val update = Closed(curr.failures + 1) - if (!state.compareAndSet(curr, update)) markOrResetFailures(result) // retry? + if (!state.compareAndSet(curr, update)) markOrResetFailures(result) // retry? else throw result.value } else { // N.B. this could be canceled, however we don't care @@ -242,7 +246,7 @@ private constructor( // We've gone over the permitted failures threshold, // so we need to open the circuit breaker val update = Open(now, resetTimeout, CompletableDeferred()) - if (!state.compareAndSet(curr, update)) markOrResetFailures(result) // retry + if (!state.compareAndSet(curr, update)) markOrResetFailures(result) // retry else { onOpen.invoke() throw result.value @@ -268,7 +272,7 @@ private constructor( */ private suspend fun attemptReset( task: suspend () -> A, - resetTimeout: Double, + resetTimeout: Duration, awaitClose: CompletableDeferred, lastStartedAt: Long ): A = @@ -292,8 +296,8 @@ private constructor( } is ExitCase.Failure -> { // Failed reset, which means we go back in the Open state with new expiry val nextTimeout - val value: Double = (resetTimeout * exponentialBackoffFactor) - val nextTimeout: Double = + val value: Duration = (resetTimeout * exponentialBackoffFactor) + val nextTimeout: Duration = if (maxResetTimeout.isFinite() && value > maxResetTimeout) maxResetTimeout else value val ts = timeInMillis() @@ -321,9 +325,9 @@ private constructor( CircuitBreaker( state = state, maxFailures = maxFailures, - resetTimeout = resetTimeout, + resetTimeoutNanos = resetTimeout.toDouble(DurationUnit.NANOSECONDS), exponentialBackoffFactor = exponentialBackoffFactor, - maxResetTimeout = maxResetTimeout, + maxResetTimeoutNanos = maxResetTimeout.toDouble(DurationUnit.NANOSECONDS), onRejected = suspend { onRejected.invoke(); callback.invoke() }, onClosed = onClosed, onHalfOpen = onHalfOpen, @@ -347,9 +351,9 @@ private constructor( CircuitBreaker( state = state, maxFailures = maxFailures, - resetTimeout = resetTimeout, + resetTimeoutNanos = resetTimeout.toDouble(DurationUnit.NANOSECONDS), exponentialBackoffFactor = exponentialBackoffFactor, - maxResetTimeout = maxResetTimeout, + maxResetTimeoutNanos = maxResetTimeout.toDouble(DurationUnit.NANOSECONDS), onRejected = onRejected, onClosed = suspend { onClosed.invoke(); callback.invoke(); }, onHalfOpen = onHalfOpen, @@ -373,9 +377,9 @@ private constructor( CircuitBreaker( state = state, maxFailures = maxFailures, - resetTimeout = resetTimeout, + resetTimeoutNanos = resetTimeout.toDouble(DurationUnit.NANOSECONDS), exponentialBackoffFactor = exponentialBackoffFactor, - maxResetTimeout = maxResetTimeout, + maxResetTimeoutNanos = maxResetTimeout.toDouble(DurationUnit.NANOSECONDS), onRejected = onRejected, onClosed = onClosed, onHalfOpen = suspend { onHalfOpen.invoke(); callback.invoke() }, @@ -399,9 +403,9 @@ private constructor( CircuitBreaker( state = state, maxFailures = maxFailures, - resetTimeout = resetTimeout, + resetTimeoutNanos = resetTimeout.toDouble(DurationUnit.NANOSECONDS), exponentialBackoffFactor = exponentialBackoffFactor, - maxResetTimeout = maxResetTimeout, + maxResetTimeoutNanos = maxResetTimeout.toDouble(DurationUnit.NANOSECONDS), onRejected = onRejected, onClosed = onClosed, onHalfOpen = onHalfOpen, @@ -447,20 +451,50 @@ private constructor( * @param startedAt is the timestamp in milliseconds since the * epoch when the transition to [Open] happened. * - * @param resetTimeoutNanos is the current `resetTimeout` that is + * @param resetTimeout is the current `resetTimeout` that is * applied to this `Open` state, to be multiplied by the * exponential backoff factor for the next transition from * `HalfOpen` to `Open`. */ public class Open internal constructor( public val startedAt: Long, - public val resetTimeoutNanos: Double, - internal val awaitClose: CompletableDeferred + public val resetTimeout: Duration, + internal val awaitClose: CompletableDeferred, ) : State() { - + + @Deprecated( + "Prefer to use resetTimeout with kotlin.time.Duration", + ReplaceWith( + "resetTimeout.toDouble(DurationUnit.NANOSECONDS)", + "kotlin.time.DurationUnit" + ) + ) + public val resetTimeoutNanos: Double + get() = resetTimeout.toDouble(DurationUnit.NANOSECONDS) + + public constructor(startedAt: Long, resetTimeout: Duration) : this( + startedAt, + resetTimeout, + CompletableDeferred() + ) + + @Deprecated( + "This constructor will be removed in Arrow 2.0", + level = DeprecationLevel.WARNING + ) + internal constructor( + startedAt: Long, + resetTimeoutNanos: Double, + awaitClose: CompletableDeferred, + ) : this(startedAt, resetTimeoutNanos.nanoseconds, awaitClose) + + @Deprecated( + "This constructor will be removed in Arrow 2.0", + level = DeprecationLevel.WARNING + ) public constructor(startedAt: Long, resetTimeoutNanos: Double) : this( startedAt, - resetTimeoutNanos, + resetTimeoutNanos.nanoseconds, CompletableDeferred() ) @@ -470,20 +504,20 @@ private constructor( * It is calculated as: * `startedAt + resetTimeout` */ - public val expiresAt: Long = startedAt + (resetTimeoutNanos.toLong() / 1_000_000) + public val expiresAt: Long = resetTimeout.plus(startedAt.milliseconds).toLong(DurationUnit.MILLISECONDS) override fun equals(other: Any?): Boolean = if (other is Open) this.startedAt == startedAt && - this.resetTimeoutNanos == resetTimeoutNanos && + this.resetTimeout == resetTimeout && this.expiresAt == expiresAt else false override fun toString(): String = - "CircuitBreaker.State.Open(startedAt=$startedAt, resetTimeoutNanos=$resetTimeoutNanos, expiresAt=$expiresAt)" + "CircuitBreaker.State.Open(startedAt=$startedAt, resetTimeoutNanos=$resetTimeout, expiresAt=$expiresAt)" override fun hashCode(): Int { var result = startedAt.hashCode() - result = 31 * result + resetTimeoutNanos.hashCode() + result = 31 * result + resetTimeout.hashCode() result = 31 * result + expiresAt.hashCode() return result } @@ -495,26 +529,47 @@ private constructor( * - If the `test request` succeeds, then the [CircuitBreaker] is tripped back into [Closed], with the reset timeout, and the failures count also reset to their initial values. * - If the `test request` fails, then the [CircuitBreaker] is tripped back into [Open], the [resetTimeout] is multiplied by the [exponentialBackoffFactor], up to the configured [maxResetTimeout]. * - * @param resetTimeoutNanos is the current `reset timeout` that the [CircuitBreaker] has to stay in [Open] state. + * @param resetTimeout is the current `reset timeout` that the [CircuitBreaker] has to stay in [Open] state. * When the `reset timeout` lapsed, than the [CircuitBreaker] will allow a test request to go through in [HalfOpen]. - * If the test request failed, the [CircuitBreaker] will go back into [Open] and it'll multiply the [resetTimeoutNanos] with the the exponential backoff factor. + * If the test request failed, the [CircuitBreaker] will go back into [Open] and it'll multiply the [resetTimeout] with the the exponential backoff factor. */ public class HalfOpen internal constructor( - public val resetTimeoutNanos: Double, + public val resetTimeout: Duration, internal val awaitClose: CompletableDeferred ) : State() { + + @Deprecated( + "Prefer to use resetTimeout with kotlin.time.Duration", + ReplaceWith( + "resetTimeout.toDouble(DurationUnit.NANOSECONDS)", + "kotlin.time.DurationUnit" + ) + ) + public val resetTimeoutNanos: Double + get() = resetTimeout.toDouble(DurationUnit.NANOSECONDS) + + public constructor(resetTimeout: Duration) : this(resetTimeout, CompletableDeferred()) + + @Deprecated( + "This constructor will be removed in Arrow 2.0", + level = DeprecationLevel.WARNING + ) + internal constructor( + resetTimeoutNanos: Double, + awaitClose: CompletableDeferred, + ) : this(resetTimeoutNanos.nanoseconds, awaitClose) - public constructor(resetTimeoutNanos: Double) : this(resetTimeoutNanos, CompletableDeferred()) + public constructor(resetTimeoutNanos: Double) : this(resetTimeoutNanos.nanoseconds, CompletableDeferred()) override fun hashCode(): Int = - resetTimeoutNanos.hashCode() + resetTimeout.hashCode() override fun equals(other: Any?): Boolean = - if (other is HalfOpen) resetTimeoutNanos == other.resetTimeoutNanos + if (other is HalfOpen) resetTimeout == other.resetTimeout else false override fun toString(): String = - "HalfOpen(resetTimeoutNanos=$resetTimeoutNanos)" + "HalfOpen(resetTimeoutNanos=$resetTimeout)" } } @@ -535,7 +590,7 @@ private constructor( * the `resetTimeout` when in the `HalfOpen` state, in case * the attempt to `Close` fails. * - * @param maxResetTimeout is the maximum timeout the circuit breaker + * @param maxResetTimeoutNanos is the maximum timeout the circuit breaker * is allowed to use when applying the `exponentialBackoffFactor`. * * @param onRejected is a callback for signaling rejected tasks, so @@ -550,22 +605,37 @@ private constructor( * @param onOpen is a callback for signaling transitions to [CircuitBreaker.State.Open]. * */ + @Deprecated( + "Prefer the kotlin.time.Duration constructor instead", + ReplaceWith( + "of(maxFailures, resetTimeoutNanos.nanoseconds, exponentialBackoffFactor, maxResetTimeout, onRejected, onClosed, onHalfOpen, onOpen)", + "import kotlin.time.Duration.Companion.nanoseconds" + ) + ) public suspend fun of( maxFailures: Int, resetTimeoutNanos: Double, exponentialBackoffFactor: Double = 1.0, - maxResetTimeout: Double = Double.POSITIVE_INFINITY, + maxResetTimeoutNanos: Double = Double.POSITIVE_INFINITY, onRejected: suspend () -> Unit = { }, onClosed: suspend () -> Unit = { }, onHalfOpen: suspend () -> Unit = { }, - onOpen: suspend () -> Unit = { } + onOpen: suspend () -> Unit = { }, ): CircuitBreaker = CircuitBreaker( state = AtomicRef(Closed(0)), - maxFailures = requireNotNull(maxFailures.takeIf { it >= 0 }) { "maxFailures expected to be higher than 0" }, - resetTimeout = requireNotNull(resetTimeoutNanos.takeIf { it > 0 }) { "resetTimeoutNanos expected to be higher than 0" }, - exponentialBackoffFactor = requireNotNull(exponentialBackoffFactor.takeIf { it > 0 }) { "exponentialBackoffFactor expected to be higher than 0" }, - maxResetTimeout = requireNotNull(maxResetTimeout.takeIf { it > 0 }) { "maxResetTimeout expected to be higher than 0" }, + maxFailures = maxFailures + .takeIf { it >= 0 } + .let { requireNotNull(it) { "maxFailures expected to be greater than or equal to 0, but was $maxFailures" } }, + resetTimeoutNanos = resetTimeoutNanos + .takeIf { it > 0 } + .let { requireNotNull(it) { "resetTimeout expected to be greater than 0, but was $resetTimeoutNanos" } }, + exponentialBackoffFactor = exponentialBackoffFactor + .takeIf { it > 0 } + .let { requireNotNull(it) { "exponentialBackoffFactor expected to be greater than 0, but was $exponentialBackoffFactor" } }, + maxResetTimeoutNanos = maxResetTimeoutNanos + .takeIf { it > 0 } + .let { requireNotNull(it) { "maxResetTimeout expected to be greater than 0, but was $maxResetTimeoutNanos" } }, onRejected = onRejected, onClosed = onClosed, onHalfOpen = onHalfOpen, @@ -601,7 +671,6 @@ private constructor( * @param onOpen is a callback for signaling transitions to [CircuitBreaker.State.Open]. * */ - @ExperimentalTime public suspend fun of( maxFailures: Int, resetTimeout: Duration, @@ -610,17 +679,27 @@ private constructor( onRejected: suspend () -> Unit = suspend { }, onClosed: suspend () -> Unit = suspend { }, onHalfOpen: suspend () -> Unit = suspend { }, - onOpen: suspend () -> Unit = suspend { } + onOpen: suspend () -> Unit = suspend { }, ): CircuitBreaker = of( - maxFailures, - resetTimeout.toDouble(DurationUnit.NANOSECONDS), - exponentialBackoffFactor, - maxResetTimeout.toDouble(DurationUnit.NANOSECONDS), - onRejected, - onClosed, - onHalfOpen, - onOpen + maxFailures = maxFailures + .takeIf { it >= 0 } + .let { requireNotNull(it) { "maxFailures expected to be greater than or equal to 0, but was $maxFailures" } }, + resetTimeoutNanos = resetTimeout + .takeIf { it.isPositive() && it != Duration.ZERO } + .let { requireNotNull(it) { "resetTimeout expected to be greater than ${Duration.ZERO}, but was $resetTimeout" } } + .toDouble(DurationUnit.NANOSECONDS), + exponentialBackoffFactor = exponentialBackoffFactor + .takeIf { it > 0 } + .let { requireNotNull(it) { "exponentialBackoffFactor expected to be greater than 0, but was $exponentialBackoffFactor" } }, + maxResetTimeoutNanos = maxResetTimeout + .takeIf { it.isPositive() && it != Duration.ZERO } + .let { requireNotNull(it) { "maxResetTimeout expected to be greater than ${Duration.ZERO}, but was $maxResetTimeout" } } + .toDouble(DurationUnit.NANOSECONDS), + onRejected = onRejected, + onClosed = onClosed, + onHalfOpen = onHalfOpen, + onOpen = onOpen ) } } diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt index a3301b65ee1..399c6a76797 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt @@ -5,6 +5,8 @@ import arrow.core.continuations.update import arrow.core.identity import arrow.core.prependTo import kotlinx.coroutines.CancellationException +import arrow.fx.coroutines.ExitCase.Companion.ExitCase +import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow @@ -441,6 +443,52 @@ public fun Resource.asFlow(): Flow = } } +/** + * Deconstruct [Resource] into an [A] and a `release` handler. + * The `release` action **must** always be called, if never called, then the resource [A] will leak. + * The `release` step is already made `NonCancellable` to guarantee correct invocation like `Resource` or `bracketCase`, + * and it will automatically rethrow, and compose, the exceptions as needed. + * + * ```kotlin + * import arrow.fx.coroutines.* + * import arrow.fx.coroutines.ExitCase.Companion.ExitCase + * + * val resource = + * resource({ "Acquire" }) { _, exitCase -> println("Release $exitCase") } + * + * suspend fun main(): Unit { + * val (acquired: String, release: suspend (ExitCase) -> Unit) = resource.allocated() + * try { + * /** Do something with A */ + * release(ExitCase.Completed) + * } catch(e: Throwable) { + * release(ExitCase(e)) + * } + * } + * ``` + * + * + * This is a **delicate** API. It is easy to accidentally create resource or memory leaks `allocated` is used. + * A `Resource` allocated by `allocated` is not subject to the guarantees that [Resource] makes, + * instead the caller is responsible for correctly invoking the `release` handler at the appropriate time. + * This API is useful for building inter-op APIs between [Resource] and non-suspending code, such as Java libraries. + */ +@DelicateCoroutinesApi +public suspend fun Resource.allocated(): Pair Unit> { + val effect = ResourceScopeImpl() + val allocated: A = invoke(effect) + val release: suspend (ExitCase) -> Unit = { e -> + val suppressed: Throwable? = effect.cancelAll(e) + val original: Throwable? = when(e) { + ExitCase.Completed -> null + is ExitCase.Cancelled -> e.exception + is ExitCase.Failure -> e.failure + } + Platform.composeErrors(original, suppressed)?.let { throw it } + } + return Pair(allocated, release) +} + @JvmInline private value class ResourceScopeImpl( private val finalizers: AtomicRef Unit>> = AtomicRef(emptyList()), diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/CircuitBreakerTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/CircuitBreakerTest.kt index 265b318e453..3561a4a1629 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/CircuitBreakerTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/CircuitBreakerTest.kt @@ -2,6 +2,7 @@ package arrow.fx.coroutines import arrow.core.Either import arrow.core.test.stackSafeIteration +import io.kotest.assertions.asClue import io.kotest.assertions.fail import io.kotest.assertions.throwables.shouldThrow import io.kotest.matchers.shouldBe @@ -10,9 +11,11 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.delay import kotlinx.coroutines.withContext +import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.minutes -import kotlin.time.DurationUnit.NANOSECONDS +import kotlin.time.Duration.Companion.seconds +import kotlin.time.DurationUnit import kotlin.time.ExperimentalTime @ExperimentalTime @@ -79,7 +82,7 @@ class CircuitBreakerTest : ArrowFxSpec( when (val s = cb.state()) { is CircuitBreaker.State.Open -> { - s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS) + s.resetTimeout shouldBe resetTimeout } else -> fail("Invalid state: Expect CircuitBreaker.State.Open but found $s") } @@ -106,7 +109,7 @@ class CircuitBreakerTest : ArrowFxSpec( when (val s = cb.state()) { is CircuitBreaker.State.Open -> { - s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS) + s.resetTimeout shouldBe resetTimeout } else -> fail("Invalid state: Expect CircuitBreaker.State.Open but found $s") } @@ -121,7 +124,7 @@ class CircuitBreakerTest : ArrowFxSpec( when (val s = cb.state()) { is CircuitBreaker.State.Open -> { - s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS) + s.resetTimeout shouldBe resetTimeout } else -> fail("Invalid state: Expect CircuitBreaker.State.Open but found $s") } @@ -143,7 +146,7 @@ class CircuitBreakerTest : ArrowFxSpec( when (val s = cb.state()) { is CircuitBreaker.State.HalfOpen -> { - s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS) + s.resetTimeout shouldBe resetTimeout } else -> fail("Invalid state: Expect CircuitBreaker.State.HalfOpen but found $s") } @@ -186,7 +189,7 @@ class CircuitBreakerTest : ArrowFxSpec( when (val s = cb.state()) { is CircuitBreaker.State.Open -> { - s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS) + s.resetTimeout shouldBe resetTimeout } else -> fail("Invalid state: Expect CircuitBreaker.State.Open but found $s") } @@ -201,7 +204,7 @@ class CircuitBreakerTest : ArrowFxSpec( when (val s = cb.state()) { is CircuitBreaker.State.Open -> { - s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS) + s.resetTimeout shouldBe resetTimeout } else -> fail("Invalid state: Expect CircuitBreaker.State.Open but found $s") } @@ -226,7 +229,7 @@ class CircuitBreakerTest : ArrowFxSpec( when (val s = cb.state()) { is CircuitBreaker.State.HalfOpen -> { - s.resetTimeoutNanos shouldBe resetTimeout.toDouble(NANOSECONDS) + s.resetTimeout shouldBe resetTimeout } else -> fail("Invalid state: Expect CircuitBreaker.State.HalfOpen but found $s") } @@ -243,7 +246,7 @@ class CircuitBreakerTest : ArrowFxSpec( // resetTimeout should've applied when (val s = cb.state()) { is CircuitBreaker.State.Open -> { - s.resetTimeoutNanos shouldBe (resetTimeout * exponentialBackoffFactor).toDouble(NANOSECONDS) + s.resetTimeout shouldBe resetTimeout * exponentialBackoffFactor } else -> fail("Invalid state: Expect CircuitBreaker.State.Open but found $s") } @@ -267,9 +270,52 @@ class CircuitBreakerTest : ArrowFxSpec( stackSafeIteration(), 0 ) shouldBe stackSafeIteration() } + + listOf( + ConstructorValues(maxFailures = -1), + ConstructorValues(resetTimeout = Duration.ZERO), + ConstructorValues(resetTimeout = (-1).seconds), + ConstructorValues(exponentialBackoffFactor = 0.0), + ConstructorValues(exponentialBackoffFactor = -1.0), + ConstructorValues(maxResetTimeout = Duration.ZERO), + ConstructorValues(maxResetTimeout = (-1).seconds), + ).forEach { value -> + "should require valid constructor values" { + value.asClue { (maxFailures, resetTimeout, exponentialBackoffFactor, maxResetTimeout) -> + shouldThrow { + CircuitBreaker.of(maxFailures, resetTimeout, exponentialBackoffFactor, maxResetTimeout) + } + + shouldThrow { + CircuitBreaker.of( + maxFailures, + resetTimeout.toDouble(DurationUnit.NANOSECONDS), + exponentialBackoffFactor, + maxResetTimeout.toDouble(DurationUnit.NANOSECONDS) + ) + } + + shouldThrow { + CircuitBreaker.of( + maxFailures, + resetTimeout, + exponentialBackoffFactor, + maxResetTimeout + ) + } + } + } + } } ) +private data class ConstructorValues( + val maxFailures: Int = 1, + val resetTimeout: Duration = 1.seconds, + val exponentialBackoffFactor: Double = 1.0, + val maxResetTimeout: Duration = Duration.INFINITE, +) + /** * Recurs the effect [n] times, and collects the output along the way for easy asserting. */ diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScopeSpec.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt similarity index 78% rename from arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScopeSpec.kt rename to arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt index 478809b523f..dd5dfb9d74b 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ScopeSpec.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt @@ -3,27 +3,37 @@ package arrow.fx.coroutines import arrow.core.Either import arrow.core.continuations.either import arrow.core.left +import arrow.fx.coroutines.ExitCase.Companion.ExitCase import io.kotest.assertions.fail import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.StringSpec +import io.kotest.inspectors.forAll import io.kotest.matchers.collections.shouldContainExactly +import io.kotest.matchers.nulls.shouldNotBeNull import io.kotest.matchers.should import io.kotest.matchers.shouldBe import io.kotest.matchers.types.shouldBeInstanceOf import io.kotest.matchers.types.shouldBeTypeOf import io.kotest.property.Arb +import io.kotest.property.Exhaustive import io.kotest.property.arbitrary.bool +import io.kotest.property.arbitrary.element import io.kotest.property.arbitrary.int import io.kotest.property.arbitrary.list +import io.kotest.property.arbitrary.map import io.kotest.property.arbitrary.negativeInt +import io.kotest.property.arbitrary.orNull import io.kotest.property.arbitrary.positiveInt import io.kotest.property.checkAll import io.kotest.property.arbitrary.string +import io.kotest.property.exhaustive.collection +import io.kotest.property.exhaustive.of import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.async import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList +import kotlin.random.Random class ResourceTest : StringSpec({ @@ -131,8 +141,8 @@ class ResourceTest : StringSpec({ "parZip - success" { suspend fun ResourceScope.closeable(): CheckableAutoClose = - install({ CheckableAutoClose() } ) { a: CheckableAutoClose, _: ExitCase -> a.close() } - + install({ CheckableAutoClose() }) { a: CheckableAutoClose, _: ExitCase -> a.close() } + resourceScope { parZip({ (1..depth).map { closeable() } @@ -238,8 +248,9 @@ class ResourceTest : StringSpec({ resourceScope { parZip({ install({ - require(started.complete(Unit)) - i }, { ii: Int, ex: ExitCase -> + require(started.complete(Unit)) + i + }, { ii: Int, ex: ExitCase -> require(released.complete(ii to ex)) }) }, { @@ -269,8 +280,9 @@ class ResourceTest : StringSpec({ throw cancel }, { install({ - require(started.complete(Unit)) - i }, { ii: Int, ex: ExitCase -> + require(started.complete(Unit)) + i + }, { ii: Int, ex: ExitCase -> require(released.complete(ii to ex)) }) }) { _, _ -> } @@ -292,8 +304,9 @@ class ResourceTest : StringSpec({ resourceScope { parZip({ install({ - require(started.complete(Unit)) - i }, { ii: Int, ex: ExitCase -> require(released.complete(ii to ex)) } + require(started.complete(Unit)) + i + }, { ii: Int, ex: ExitCase -> require(released.complete(ii to ex)) } ) }, { started.await() @@ -322,7 +335,8 @@ class ResourceTest : StringSpec({ }, { install({ require(started.complete(Unit)) - i }, { ii: Int, ex: ExitCase -> require(released.complete(ii to ex)) } + i + }, { ii: Int, ex: ExitCase -> require(released.complete(ii to ex)) } ) }) { _, _ -> } fail("It should never reach here") @@ -345,7 +359,7 @@ class ResourceTest : StringSpec({ parZip({ install({ i }, { ii: Int, ex: ExitCase -> require(released.complete(ii to ex)) }) }, { - install({ }, { _: Unit, _: ExitCase -> throw cancel }) + install({ }, { _: Unit, _: ExitCase -> throw cancel }) }) { _, _ -> } } } @@ -364,7 +378,7 @@ class ResourceTest : StringSpec({ shouldThrow { resourceScope { parZip({ - install({ }, { _: Unit, _: ExitCase -> throw cancel }) + install({ }, { _: Unit, _: ExitCase -> throw cancel }) }, { install({ i }, { ii: Int, ex: ExitCase -> require(released.complete(ii to ex)) }) }) { _, _ -> } @@ -386,7 +400,7 @@ class ResourceTest : StringSpec({ parZip({ install({ i }, { ii: Int, ex: ExitCase -> require(released.complete(ii to ex)) }) }, { - install({ }, { _: Unit, _: ExitCase -> throw throwable }) + install({ }, { _: Unit, _: ExitCase -> throw throwable }) }) { _, _ -> } } } shouldBe throwable @@ -404,7 +418,7 @@ class ResourceTest : StringSpec({ shouldThrow { resourceScope { parZip({ - install({ }, { _: Unit, _: ExitCase -> throw throwable }) + install({ }, { _: Unit, _: ExitCase -> throw throwable }) }, { install({ i }, { ii: Int, ex: ExitCase -> require(released.complete(ii to ex)) }) }) { _, _ -> } @@ -425,9 +439,9 @@ class ResourceTest : StringSpec({ shouldThrow { resourceScope { parZip({ - install({ a } ) { aa: Int, ex: ExitCase -> require(releasedA.complete(aa to ex)) } + install({ a }) { aa: Int, ex: ExitCase -> require(releasedA.complete(aa to ex)) } }, { - install({ b } ) { bb: Int, ex: ExitCase -> require(releasedB.complete(bb to ex)) } + install({ b }) { bb: Int, ex: ExitCase -> require(releasedB.complete(bb to ex)) } }) { _, _ -> } throw throwable } @@ -451,9 +465,9 @@ class ResourceTest : StringSpec({ shouldThrow { resourceScope { parZip({ - install({ a } ) { aa: Int, ex: ExitCase -> require(releasedA.complete(aa to ex)) } + install({ a }) { aa: Int, ex: ExitCase -> require(releasedA.complete(aa to ex)) } }, { - install({ b } ) { bb: Int, ex: ExitCase -> require(releasedB.complete(bb to ex)) } + install({ b }) { bb: Int, ex: ExitCase -> require(releasedB.complete(bb to ex)) } }) { _, _ -> } throw CancellationException("") } @@ -505,4 +519,72 @@ class ResourceTest : StringSpec({ released.await().shouldBeTypeOf() } } + + "allocated" { + checkAll(Arb.int()) { seed -> + val released = CompletableDeferred() + val (allocate, release) = resource({ seed }) { _, exitCase -> released.complete(exitCase) } + .allocated() + + allocate shouldBe seed + release(ExitCase.Completed) + released.await() shouldBe ExitCase.Completed + } + } + + "allocated - suppressed exception" { + checkAll( + Arb.int(), + Arb.string().map(::RuntimeException), + Arb.string().map(::IllegalStateException) + ) { seed, original, suppressed -> + val released = CompletableDeferred() + val (allocate, release) = + resource({ seed }) { _, exitCase -> + released.complete(exitCase) + throw suppressed + }.allocated() + + val exception = shouldThrow { + try { + allocate shouldBe seed + throw original + } catch (e: Throwable) { + release(ExitCase(e)) + } + } + + exception shouldBe original + exception.suppressedExceptions.firstOrNull().shouldNotBeNull() shouldBe suppressed + released.await().shouldBeTypeOf() + } + } + + "allocated - cancellation exception" { + checkAll( + Arb.int(), + Arb.string().map { CancellationException(it, null) }, + Arb.string().map(::IllegalStateException) + ) { seed, cancellation, suppressed -> + val released = CompletableDeferred() + val (allocate, release) = + resource({ seed }) { _, exitCase -> + released.complete(exitCase) + throw suppressed + }.allocated() + + val exception = shouldThrow { + try { + allocate shouldBe seed + throw cancellation + } catch (e: Throwable) { + release(ExitCase(e)) + } + } + + exception shouldBe cancellation + exception.suppressedExceptions.firstOrNull().shouldNotBeNull() shouldBe suppressed + released.await().shouldBeTypeOf() + } + } }) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-09.kt b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-09.kt new file mode 100644 index 00000000000..c2642bb90dd --- /dev/null +++ b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/examples/example-resource-09.kt @@ -0,0 +1,18 @@ +// This file was automatically generated from Resource.kt by Knit tool. Do not edit. +package arrow.fx.coroutines.examples.exampleResource09 + +import arrow.fx.coroutines.* +import arrow.fx.coroutines.ExitCase.Companion.ExitCase + +val resource = + resource({ "Acquire" }) { _, exitCase -> println("Release $exitCase") } + +suspend fun main(): Unit { + val (acquired: String, release: suspend (ExitCase) -> Unit) = resource.allocated() + try { + /** Do something with A */ + release(ExitCase.Completed) + } catch(e: Throwable) { + release(ExitCase(e)) + } +}