Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,10 @@ lazy val std = crossProject(JSPlatform, JVMPlatform, NativePlatform)
// introduced by #3409
// extracted UnsafeUnbounded private data structure
ProblemFilters.exclude[MissingClassProblem]("cats.effect.std.Queue$UnsafeUnbounded"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.std.Queue$UnsafeUnbounded$Cell")
ProblemFilters.exclude[MissingClassProblem]("cats.effect.std.Queue$UnsafeUnbounded$Cell"),
// introduced by #3480
// adds method to sealed Hotswap
ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.std.Hotswap.get")
)
)
.jsSettings(
Expand Down
106 changes: 71 additions & 35 deletions std/shared/src/main/scala/cats/effect/std/Hotswap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,20 @@ sealed trait Hotswap[F[_], R] {
* is not used thereafter. Failure to do so may result in an error on the _consumer_ side. In
* any case, no resources will be leaked.
*
* For safer access to the current resource see [[get]], which guarantees that it will not be
* released while it is being used.
*
* If [[swap]] is called after the lifetime of the [[Hotswap]] is over, it will raise an
* error, but will ensure that all resources are finalized before returning.
*/
def swap(next: Resource[F, R]): F[R]
Comment on lines +67 to 73
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is a bit unsafey since it directly returns the R but I think we'll just have to live with that.


/**
* Gets the current resource, if it exists. The returned resource is guaranteed to be
* available for the duration of the returned resource.
*/
def get: Resource[F, Option[R]]
Comment on lines +75 to +79
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a little bit annoying UX that this is an Option[R], alternatively we could raise an error. But the user can decide to do that too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bikeshed the name. access a la HotswapRef ?


/**
* Pops and runs the finalizer of the current resource, if it exists.
*
Expand All @@ -92,45 +101,72 @@ object Hotswap {
* Creates a new [[Hotswap]], which represents a [[cats.effect.kernel.Resource]] that can be
* swapped during the lifetime of this [[Hotswap]].
*/
def create[F[_], R](implicit F: Concurrent[F]): Resource[F, Hotswap[F, R]] = {
type State = Option[F[Unit]]

def initialize: F[Ref[F, State]] =
F.ref(Some(F.pure(())))

def finalize(state: Ref[F, State]): F[Unit] =
state.getAndSet(None).flatMap {
case Some(finalizer) => finalizer
case None => raise("Hotswap already finalized")
}

def raise(message: String): F[Unit] =
F.raiseError[Unit](new RuntimeException(message))

Resource.make(initialize)(finalize).map { state =>
new Hotswap[F, R] {

override def swap(next: Resource[F, R]): F[R] =
F.uncancelable { poll =>
poll(next.allocated).flatMap {
case (r, finalizer) =>
swapFinalizer(finalizer).as(r)
def create[F[_], R](implicit F: Concurrent[F]): Resource[F, Hotswap[F, R]] =
Resource.eval(Semaphore[F](Long.MaxValue)).flatMap { semaphore =>
sealed abstract class State
case object Cleared extends State
case class Acquired(r: R, fin: F[Unit]) extends State
case object Finalized extends State

def initialize: F[Ref[F, State]] =
F.ref(Cleared)

def finalize(state: Ref[F, State]): F[Unit] =
state.getAndSet(Finalized).flatMap {
case Acquired(_, finalizer) => finalizer
case Cleared => F.unit
case Finalized => raise("Hotswap already finalized")
}

def raise(message: String): F[Unit] =
F.raiseError[Unit](new RuntimeException(message))

def shared: Resource[F, Unit] = semaphore.permit

def exclusive: Resource[F, Unit] =
Resource.makeFull[F, Unit](poll => poll(semaphore.acquireN(Long.MaxValue)))(_ =>
semaphore.releaseN(Long.MaxValue))
Comment on lines +124 to +128
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stole this from here:
https://github.com/janstenpickle/hotswap-ref/blob/34781a0af15784981cd007b74f0f5c31836d73f3/modules/core/src/main/scala/io/janstenpickle/hotswapref/Lock.scala

It seems like a good addition to std on its own merits.

trait Lock[F[_]] {
  def shared: Resource[F, Unit]
  def exclusive: Resource[F, Unit]
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock would probably be a good addition, but the tricky bit is that it isn't reentrant, so we have to be really careful.


Resource.make(initialize)(finalize).map { state =>
new Hotswap[F, R] {

override def swap(next: Resource[F, R]): F[R] =
exclusive.surround {
F.uncancelable { poll =>
poll(next.allocated).flatMap {
case (r, fin) =>
swapFinalizer(Acquired(r, fin)).as(r)
}
}
}
}

override def clear: F[Unit] =
swapFinalizer(F.unit).uncancelable

private def swapFinalizer(next: F[Unit]): F[Unit] =
state.modify {
case Some(previous) =>
Some(next) -> previous
case None =>
None -> (next *> raise("Cannot swap after finalization"))
}.flatten
override def get: Resource[F, Option[R]] =
shared.evalMap { _ =>
state.get.map {
case Acquired(r, _) => Some(r)
case _ => None
}
}

override def clear: F[Unit] =
exclusive.surround(swapFinalizer(Cleared).uncancelable)

private def swapFinalizer(next: State): F[Unit] =
state.modify {
case Acquired(_, fin) =>
next -> fin
case Cleared =>
next -> F.unit
case Finalized =>
val fin = next match {
case Acquired(_, fin) => fin
case _ => F.unit
}
Finalized -> (fin *> raise("Cannot swap after finalization"))
}.flatten

}
}
}
}

}
29 changes: 29 additions & 0 deletions tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package std

import cats.effect.kernel.Ref

import scala.concurrent.duration._

class HotswapSpec extends BaseSpec { outer =>

sequential
Expand Down Expand Up @@ -75,6 +77,33 @@ class HotswapSpec extends BaseSpec { outer =>
}
}
}

"not release current resource while it is in use" in ticked { implicit ticker =>
val r = Resource.make(IO.ref(true))(_.set(false))
val go = Hotswap.create[IO, Ref[IO, Boolean]].use { hs =>
hs.swap(r) *> (IO.sleep(1.second) *> hs.clear).background.surround {
hs.get.use {
case Some(ref) =>
val notReleased = ref.get.flatMap(b => IO(b must beTrue))
notReleased *> IO.sleep(2.seconds) *> notReleased.void
case None => IO(false must beTrue).void
}
}
}

go must completeAs(())
}

"resource can be accessed concurrently" in ticked { implicit ticker =>
val go = Hotswap.create[IO, Unit].use { hs =>
hs.swap(Resource.unit) *>
hs.get.useForever.background.surround {
IO.sleep(1.second) *> hs.get.use_
}
}

go must completeAs(())
}
}

}