Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Fixes to Schedule #184

Merged
merged 10 commits into from
Jun 16, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ sealed class Schedule<Input, Output> {

/**
* Create a schedule that never retries.
*
* Note that this will hang a program if used as a repeat/retry schedule unless cancelled.
*/
fun <A> never(): Schedule<A, Nothing> =
invoke(suspend { arrow.fx.coroutines.never<Unit>() }) { _, _ ->
Expand Down
43 changes: 26 additions & 17 deletions arrow-fx/src/main/kotlin/arrow/fx/Schedule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import arrow.core.right
import arrow.core.some
import arrow.core.toT
import arrow.fx.Schedule.Companion.withMonad
import arrow.fx.typeclasses.Async
import arrow.fx.typeclasses.Concurrent
import arrow.fx.typeclasses.Duration
import arrow.fx.typeclasses.MonadDefer
Expand Down Expand Up @@ -401,7 +402,7 @@ sealed class Schedule<F, Input, Output> : ScheduleOf<F, Input, Output> {

/**
* Add random jitter to a schedule.
* The argument [genRand] is supposed to be a computation with when run returns
* The argument [genRand] is supposed to be a computation that returns
* doubles. An example would be the following [IO] `IO { Random.nextDouble() }`.
*
* This is done to push the source of randomness to the caller which makes the function
Expand Down Expand Up @@ -690,16 +691,21 @@ sealed class Schedule<F, Input, Output> : ScheduleOf<F, Input, Output> {
unfoldM(M, M.just(c)) { M.just(f(it)) }

/**
* Create a schedule that continues forever and returns the number of iterations.
* Create a schedule that continues forever and returns the number of repetitions.
*/
fun <F, A> forever(M: Monad<F>): Schedule<F, A, Int> =
unfold(M, 0) { it + 1 }

/**
* Create a schedule that continues n times and returns the number of iterations.
* Create a schedule that continues n times and returns the number of repetitions.
*/
fun <F, A> recurs(M: Monad<F>, n: Int): Schedule<F, A, Int> =
forever<F, A>(M).whileOutput { it <= n }
invoke(M, M.just(0)) { _, acc ->
M.just(
if (acc < n) Decision.cont(0.seconds, acc + 1, Eval.now(acc + 1))
else Decision.done(0.seconds, acc, Eval.now(acc))
)
}

/**
* Create a schedule that only ever retries once.
Expand All @@ -708,10 +714,15 @@ sealed class Schedule<F, Input, Output> : ScheduleOf<F, Input, Output> {

/**
* Create a schedule that never retries.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make clear here that it'll never return from retry or repeat until canceled.

This is useful in similar situations as never where you need to explicitly suspend until canceled or interrupted. You could compose it at the end of a Schedule using andThen to suspend after a Schedule finishes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a note "Note that this will hang a program if used as a repeat/retry schedule unless cancelled."

* This is a difference with zio, where they define never as a schedule that itself never executes.
*
* Note that this will hang a program if used as a repeat/retry schedule unless cancelled.
*/
fun <F, A> never(M: Monad<F>): Schedule<F, A, Nothing> =
recurs<F, A>(M, 0).map { throw IllegalStateException("Impossible") }
fun <F, A> never(AS: Async<F>): Schedule<F, A, Nothing> =
invoke(AS, AS.never<A>()) { a, _ ->
AS.later {
Decision(false, 0.nanoseconds, a, Eval.later { throw IllegalStateException("Impossible") })
}
}

/**
* Create a schedule that uses another schedule to generate the delay of this schedule.
Expand Down Expand Up @@ -862,8 +873,6 @@ sealed class Schedule<F, Input, Output> : ScheduleOf<F, Input, Output> {

fun <A> exponential(base: Duration, factor: Double = 2.0): Schedule<M, A, Duration> =
exponential(MM(), base, factor)

fun <A> never(): Schedule<M, A, Nothing> = never(MM())
}

/**
Expand Down Expand Up @@ -940,15 +949,15 @@ fun <F, E, A, B, C> Kind<F, A>.repeatOrElseEither(

fun loop(last: A, state: Any?): Kind<F, Either<C, B>> =
schedule.update(last, state)
.flatMap { desc ->
if (desc.cont)
flatMap { a -> T.sleep(desc.delay).flatMap { loop(a, desc.state) } }
.handleErrorWith { e -> orElse(e, desc.finish.value().some()).map { Left(it) } }
else just(desc.finish.value().right())
.flatMap { step ->
if (step.cont)
T.sleep(step.delay).followedBy(this@repeatOrElseEither).flatMap { a -> loop(a, step.state) }
.handleErrorWith { e -> orElse(e, step.finish.value().some()).map(::Left) }
else just(step.finish.value().right())
}

return flatMap { a -> schedule.initialState.flatMap { b -> loop(a, b) } }
.handleErrorWith { e -> orElse(e, None).map { Left(it) } }
return this@repeatOrElseEither.flatMap { a -> schedule.initialState.flatMap { b -> loop(a, b) } }
.handleErrorWith { e -> orElse(e, None).map(::Left) }
}

/**
Expand Down Expand Up @@ -1014,7 +1023,7 @@ fun <F, E, A, B, C> Kind<F, A>.retryOrElseEither(
(schedule as Schedule.ScheduleImpl<F, Any?, E, B>)

fun loop(state: Any?): Kind<F, Either<C, A>> =
flatMap { just(it.right()) }
this@retryOrElseEither.map { it.right() }
.handleErrorWith { e ->
schedule.update(e, state)
.flatMap { dec ->
Expand Down
21 changes: 0 additions & 21 deletions arrow-fx/src/main/kotlin/arrow/fx/extensions/schedule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import arrow.fx.ForSchedule
import arrow.fx.Schedule
import arrow.fx.SchedulePartialOf
import arrow.fx.fix
import arrow.typeclasses.Alternative
import arrow.typeclasses.Applicative
import arrow.typeclasses.Apply
import arrow.typeclasses.Category
Expand All @@ -17,7 +16,6 @@ import arrow.typeclasses.Contravariant
import arrow.typeclasses.Functor
import arrow.typeclasses.Monad
import arrow.typeclasses.Monoid
import arrow.typeclasses.MonoidK
import arrow.typeclasses.Profunctor
import arrow.typeclasses.Semigroup
import arrow.typeclasses.SemigroupK
Expand Down Expand Up @@ -71,25 +69,6 @@ interface ScheduleSemigroupK<F, Input> : SemigroupK<SchedulePartialOf<F, Input>>
fix().andThen(y.fix()).map { it.fold(::identity, ::identity) }
}

@extension
interface ScheduleMonoidK<F, Input> : MonoidK<SchedulePartialOf<F, Input>>, ScheduleSemigroupK<F, Input> {
fun MF(): Monad<F>

override fun <A> empty(): Kind<SchedulePartialOf<F, Input>, A> =
Schedule.never(MF())
}

@extension
interface ScheduleAlternative<F, Input> : Alternative<SchedulePartialOf<F, Input>>, ScheduleApplicative<F, Input>, ScheduleMonoidK<F, Input> {
override fun MF(): Monad<F>

override fun <A> Kind<SchedulePartialOf<F, Input>, A>.orElse(b: Kind<SchedulePartialOf<F, Input>, A>): Kind<SchedulePartialOf<F, Input>, A> =
fix().andThen(b.fix()).map { it.fold(::identity, ::identity) }

override fun <A> Kind<SchedulePartialOf<F, Input>, A>.combineK(y: Kind<SchedulePartialOf<F, Input>, A>): Kind<SchedulePartialOf<F, Input>, A> =
orElse(y)
}

@extension
interface ScheduleContravariant<F, Output> : Contravariant<Conested<Kind<ForSchedule, F>, Output>> {
override fun <A, B> Kind<Conested<Kind<ForSchedule, F>, Output>, A>.contramap(f: (B) -> A): Kind<Conested<Kind<ForSchedule, F>, Output>, B> =
Expand Down
74 changes: 48 additions & 26 deletions arrow-fx/src/test/kotlin/arrow/fx/ScheduleTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import arrow.Kind2
import arrow.core.Either
import arrow.core.ForId
import arrow.core.Id
import arrow.core.None
import arrow.core.extensions.eq
import arrow.core.extensions.id.eqK.eqK
import arrow.core.extensions.id.monad.monad
Expand All @@ -16,18 +17,17 @@ import arrow.core.test.generators.GenK
import arrow.core.test.generators.GenK2
import arrow.core.test.generators.applicative
import arrow.core.test.generators.intSmall
import arrow.core.test.laws.AlternativeLaws
import arrow.core.test.laws.ApplicativeLaws
import arrow.core.test.laws.CategoryLaws
import arrow.core.test.laws.MonoidLaws
import arrow.core.test.laws.ProfunctorLaws
import arrow.core.toT
import arrow.core.value
import arrow.fx.extensions.io.applicativeError.attempt
import arrow.fx.extensions.io.async.async
import arrow.fx.extensions.io.monad.monad
import arrow.fx.extensions.io.monadDefer.monadDefer
import arrow.fx.extensions.io.monadThrow.monadThrow
import arrow.fx.extensions.schedule.alternative.alternative
import arrow.fx.extensions.schedule.applicative.applicative
import arrow.fx.extensions.schedule.category.category
import arrow.fx.extensions.schedule.monoid.monoid
Expand All @@ -36,6 +36,7 @@ import arrow.fx.test.eq.eqK
import arrow.fx.test.generators.genK
import arrow.fx.test.laws.forFew
import arrow.fx.typeclasses.Duration
import arrow.fx.typeclasses.milliseconds
import arrow.fx.typeclasses.seconds
import arrow.typeclasses.Eq
import arrow.typeclasses.EqK
Expand Down Expand Up @@ -116,11 +117,6 @@ class ScheduleTest : UnitSpec() {
Schedule.genK<ForId, Int>(Id.monad()).genK(Gen.int()).map { it.fix() },
eqK(Id.eqK(), Id.monad(), 0).liftEq(Int.eq())
),
AlternativeLaws.laws(
Schedule.alternative<ForId, Int>(Id.monad()),
Schedule.genK<ForId, Int>(Id.monad()),
eqK(Id.eqK(), Id.monad(), 0)
),
ProfunctorLaws.laws(
Schedule.profunctor(),
Schedule.genK2(Id.monad()),
Expand Down Expand Up @@ -164,8 +160,21 @@ class ScheduleTest : UnitSpec() {
val res = Schedule.recurs<ForId, Int>(Id.monad(), i).runIdSchedule(0, i + 1)

if (i < 0) res.isEmpty()
else res.dropLast(1).forAll { it.cont && it.delay.amount == 0L } &&
res.last().let { it.cont.not() && it.delay.amount == 0L && it.finish.value() == i + 1 && it.state == i + 1 }
else {
res.dropLast(1).forEach {
it.cont shouldBe true
it.delay.amount shouldBe 0L
}

res.last().let {
it.cont shouldBe false
it.delay.amount shouldBe 0L
it.finish.value() shouldBe i
it.state shouldBe i
}

true
}
}
}

Expand All @@ -175,10 +184,12 @@ class ScheduleTest : UnitSpec() {
}
}

"Schedule.never() == Schedule.recurs(0)" {
scheduleEq.run {
Schedule.never<ForId, Any?>(Id.monad()).eqv(Schedule.recurs(Id.monad(), 0)) shouldBe true
}
"Schedule.never() should execute once and timeout" {
val sideEffect = SideEffect()
IO { sideEffect.increment() }.repeat(Schedule.never(IO.async()))
.unsafeRunTimed(50.milliseconds) shouldBe None

sideEffect.counter shouldBe 1
}

"Schedule.spaced()" {
Expand Down Expand Up @@ -225,7 +236,7 @@ class ScheduleTest : UnitSpec() {
}

"repeat" {
forAll(Schedule.Decision.genK().genK(Gen.int()), Gen.intSmall().filter { it < 100 }.filter { it >= 0 }) { dec, n ->
forAll(Schedule.Decision.genK().genK(Gen.int()), Gen.intSmall().filter { it in 0..99 }) { dec, n ->
val schedule = Schedule(IO.monad(), IO.just(0 as Any?)) { _: Unit, _ -> IO.just(dec.fix()) }

val eff = SideEffect()
Expand All @@ -236,12 +247,18 @@ class ScheduleTest : UnitSpec() {
.attempt()
.fix().unsafeRunSync()

if (dec.fix().cont || n == 0) res.isLeft() &&
ref.get().fix().unsafeRunSync().nanoseconds == max(n - 1, 0) * dec.fix().delay.nanoseconds &&
eff.counter == n
else res.isRight() && eff.counter == 1 &&
ref.get().fix().unsafeRunSync().nanoseconds == 0L &&
(res as Either.Right).b == dec.fix().finish.value()
if (dec.fix().cont || n == 0) {
res.isLeft() shouldBe true
ref.get().fix().unsafeRunSync().nanoseconds shouldBe max(n, 0) * dec.fix().delay.nanoseconds
eff.counter shouldBe n
} else {
res.isRight() shouldBe true
eff.counter shouldBe 1
ref.get().fix().unsafeRunSync().nanoseconds shouldBe 0L
(res as Either.Right).b shouldBe dec.fix().finish.value()
}

true
}
}

Expand All @@ -261,12 +278,17 @@ class ScheduleTest : UnitSpec() {
.attempt()
.fix().unsafeRunSync()

if (dec.fix().cont) res.isRight() &&
eff.counter == n + 1 &&
ref.get().fix().unsafeRunSync().nanoseconds == (n + 1) * dec.fix().delay.nanoseconds
else res.isLeft() &&
eff.counter == 1 &&
ref.get().fix().unsafeRunSync().nanoseconds == 0L
if (dec.fix().cont) {
res.isRight() shouldBe true
eff.counter shouldBe n + 1
ref.get().fix().unsafeRunSync().nanoseconds shouldBe (n + 1) * dec.fix().delay.nanoseconds
} else {
res.isLeft() shouldBe true
eff.counter shouldBe 1
ref.get().fix().unsafeRunSync().nanoseconds shouldBe 0L
}

true
}
}
}
Expand Down