diff --git a/build.sbt b/build.sbt index 2e140de51b..dd34d3be4a 100644 --- a/build.sbt +++ b/build.sbt @@ -915,7 +915,10 @@ lazy val std = crossProject(JSPlatform, JVMPlatform, NativePlatform) // introduced by #3409 // extracted UnsafeUnbounded private data structure ProblemFilters.exclude[MissingClassProblem]("cats.effect.std.Queue$UnsafeUnbounded"), - ProblemFilters.exclude[MissingClassProblem]("cats.effect.std.Queue$UnsafeUnbounded$Cell") + ProblemFilters.exclude[MissingClassProblem]("cats.effect.std.Queue$UnsafeUnbounded$Cell"), + // introduced by #3480 + // adds method to sealed Hotswap + ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.std.Hotswap.get") ) ) .jsSettings( diff --git a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala index 8f620475cf..15ac983442 100644 --- a/std/shared/src/main/scala/cats/effect/std/Hotswap.scala +++ b/std/shared/src/main/scala/cats/effect/std/Hotswap.scala @@ -64,11 +64,20 @@ sealed trait Hotswap[F[_], R] { * is not used thereafter. Failure to do so may result in an error on the _consumer_ side. In * any case, no resources will be leaked. * + * For safer access to the current resource see [[get]], which guarantees that it will not be + * released while it is being used. + * * If [[swap]] is called after the lifetime of the [[Hotswap]] is over, it will raise an * error, but will ensure that all resources are finalized before returning. */ def swap(next: Resource[F, R]): F[R] + /** + * Gets the current resource, if it exists. The returned resource is guaranteed to be + * available for the duration of the returned resource. + */ + def get: Resource[F, Option[R]] + /** * Pops and runs the finalizer of the current resource, if it exists. * @@ -92,45 +101,72 @@ object Hotswap { * Creates a new [[Hotswap]], which represents a [[cats.effect.kernel.Resource]] that can be * swapped during the lifetime of this [[Hotswap]]. */ - def create[F[_], R](implicit F: Concurrent[F]): Resource[F, Hotswap[F, R]] = { - type State = Option[F[Unit]] - - def initialize: F[Ref[F, State]] = - F.ref(Some(F.pure(()))) - - def finalize(state: Ref[F, State]): F[Unit] = - state.getAndSet(None).flatMap { - case Some(finalizer) => finalizer - case None => raise("Hotswap already finalized") - } - - def raise(message: String): F[Unit] = - F.raiseError[Unit](new RuntimeException(message)) - - Resource.make(initialize)(finalize).map { state => - new Hotswap[F, R] { - - override def swap(next: Resource[F, R]): F[R] = - F.uncancelable { poll => - poll(next.allocated).flatMap { - case (r, finalizer) => - swapFinalizer(finalizer).as(r) + def create[F[_], R](implicit F: Concurrent[F]): Resource[F, Hotswap[F, R]] = + Resource.eval(Semaphore[F](Long.MaxValue)).flatMap { semaphore => + sealed abstract class State + case object Cleared extends State + case class Acquired(r: R, fin: F[Unit]) extends State + case object Finalized extends State + + def initialize: F[Ref[F, State]] = + F.ref(Cleared) + + def finalize(state: Ref[F, State]): F[Unit] = + state.getAndSet(Finalized).flatMap { + case Acquired(_, finalizer) => finalizer + case Cleared => F.unit + case Finalized => raise("Hotswap already finalized") + } + + def raise(message: String): F[Unit] = + F.raiseError[Unit](new RuntimeException(message)) + + def shared: Resource[F, Unit] = semaphore.permit + + def exclusive: Resource[F, Unit] = + Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ => + semaphore.releaseN(Long.MaxValue)) + + Resource.make(initialize)(finalize).map { state => + new Hotswap[F, R] { + + override def swap(next: Resource[F, R]): F[R] = + exclusive.surround { + F.uncancelable { poll => + poll(next.allocated).flatMap { + case (r, fin) => + swapFinalizer(Acquired(r, fin)).as(r) + } + } } - } - - override def clear: F[Unit] = - swapFinalizer(F.unit).uncancelable - private def swapFinalizer(next: F[Unit]): F[Unit] = - state.modify { - case Some(previous) => - Some(next) -> previous - case None => - None -> (next *> raise("Cannot swap after finalization")) - }.flatten + override def get: Resource[F, Option[R]] = + shared.evalMap { _ => + state.get.map { + case Acquired(r, _) => Some(r) + case _ => None + } + } + override def clear: F[Unit] = + exclusive.surround(swapFinalizer(Cleared).uncancelable) + + private def swapFinalizer(next: State): F[Unit] = + state.modify { + case Acquired(_, fin) => + next -> fin + case Cleared => + next -> F.unit + case Finalized => + val fin = next match { + case Acquired(_, fin) => fin + case _ => F.unit + } + Finalized -> (fin *> raise("Cannot swap after finalization")) + }.flatten + + } } } - } } diff --git a/tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala b/tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala index e8bfc8b6bc..67ef946db8 100644 --- a/tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala @@ -20,6 +20,8 @@ package std import cats.effect.kernel.Ref +import scala.concurrent.duration._ + class HotswapSpec extends BaseSpec { outer => sequential @@ -75,6 +77,33 @@ class HotswapSpec extends BaseSpec { outer => } } } + + "not release current resource while it is in use" in ticked { implicit ticker => + val r = Resource.make(IO.ref(true))(_.set(false)) + val go = Hotswap.create[IO, Ref[IO, Boolean]].use { hs => + hs.swap(r) *> (IO.sleep(1.second) *> hs.clear).background.surround { + hs.get.use { + case Some(ref) => + val notReleased = ref.get.flatMap(b => IO(b must beTrue)) + notReleased *> IO.sleep(2.seconds) *> notReleased.void + case None => IO(false must beTrue).void + } + } + } + + go must completeAs(()) + } + + "resource can be accessed concurrently" in ticked { implicit ticker => + val go = Hotswap.create[IO, Unit].use { hs => + hs.swap(Resource.unit) *> + hs.get.useForever.background.surround { + IO.sleep(1.second) *> hs.get.use_ + } + } + + go must completeAs(()) + } } }