Skip to content

Commit

Permalink
Add tests for Promise and Ref
Browse files Browse the repository at this point in the history
  • Loading branch information
SystemFw committed Dec 2, 2017
1 parent af5f4c4 commit 6d5f172
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 69 deletions.
49 changes: 49 additions & 0 deletions core/jvm/src/test/scala/fs2/async/RefSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package fs2
package async

import cats.effect.IO
import cats.implicits._

class RefSpec extends Fs2Spec with EventuallySupport {

"Ref" - {

"concurrent modifications" in {
val finalValue = 10
// Cannot use streams, parallelSequence or Promise since they are implemented with Ref
val r = refOf[IO, Int](0).unsafeRunSync

List.fill(finalValue) {
fork(r.modify(_ + 1))
}.sequence.unsafeRunSync

eventually { r.get.unsafeRunSync shouldBe finalValue }
}

"successful access" in {
val op = for {
r <- refOf[IO, Int](0)
valueAndSetter <- r.access
(value, setter) = valueAndSetter
success <- setter(value + 1)
result <- r.get
} yield success && result == 1

op.unsafeRunSync shouldBe true
}

"failed access" in {
val op = for {
r <- refOf[IO, Int](0)
valueAndSetter <- r.access
(value, setter) = valueAndSetter
_ <- r.setSync(5)
success <- setter(value + 1)
result <- r.get
} yield !success && result == 5

op.unsafeRunSync shouldBe true
}

}
}
5 changes: 2 additions & 3 deletions core/shared/src/main/scala/fs2/async/Promise.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,10 @@ object Promise {
def empty[F[_], A](implicit F: Effect[F], ec: ExecutionContext): F[Promise[F, A]] =
F.delay(unsafeCreate[F, A])

// TODO inline?
private[fs2] def unsafeCreate[F[_]: Effect, A](implicit ec: ExecutionContext): Promise[F, A] =
new Promise[F, A](new Ref(new AtomicReference(Promise.State.Unset(LinkedMap.empty))))

private[async] sealed abstract class State[A]
private sealed abstract class State[A]
private object State {
final case class Set[A](a: A) extends State[A]
final case class Unset[A](waiting: LinkedMap[Token, A => Unit]) extends State[A]
Expand Down Expand Up @@ -172,7 +171,7 @@ object Promise {
time("SYNC REF: get")(op(ref.get.void))
time("PROMISE: get")(op(promise.get.void))
time("SYNC REF: get")(op(ref.get.void))
time("PROMISE: get")(op(promise.cancellableGet.void))
time("PROMISE: get")(op(promise.cancellableGet.flatMap(_._1).void))
}

}
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/fs2/async/Ref.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ final class Ref[F[_], A] private[fs2] (private val ar: AtomicReference[A])(impli
* never succeeds again.
*/
def access: F[(A, A => F[Boolean])] = F.delay {
def snapshot = ar.get
val snapshot = ar.get
def setter = (a: A) => F.delay(ar.compareAndSet(snapshot, a))

(snapshot, setter)
Expand Down
54 changes: 54 additions & 0 deletions core/shared/src/test/scala/fs2/async/PromiseSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package fs2
package async

import cats.effect.IO
import cats.implicits._

import scala.concurrent.duration._

class PromiseSpec extends AsyncFs2Spec {

"Promise" - {

"setSync" in {
promise[IO, Int].flatMap { p =>
p.setSync(0) *> p.get
}.unsafeToFuture.map { _ shouldBe 0 }
}

"setSync is only successful once" in {
promise[IO, Int].flatMap { p =>
p.setSync(0) *> p.setSync(1) *> p.get
}.unsafeToFuture.map { _ shouldBe 0 }
}

"get blocks until set" in {
val op = for {
state <- refOf[IO, Int](0)
modifyGate <- promise[IO, Unit]
readGate <- promise[IO, Unit]
_ <- fork {
modifyGate.get *> state.modify(_ * 2) *> readGate.setSync(())
}
_ <- fork {
state.setSync(1) *> modifyGate.setSync(())
}
_ <- readGate.get
res <- state.get
} yield res

op.unsafeToFuture.map(_ shouldBe 2)
}

"timedGet" in {
mkScheduler.evalMap { scheduler =>
for {
p <- async.promise[IO,Int]
first <- p.timedGet(100.millis, scheduler)
_ <- p.setSync(42)
second <- p.timedGet(100.millis, scheduler)
} yield List(first, second)
}.runLog.unsafeToFuture.map(_.flatten shouldBe Vector(None, Some(42)))
}
}
}
65 changes: 0 additions & 65 deletions core/shared/src/test/scala/fs2/async/RefSpec.scala

This file was deleted.

0 comments on commit 6d5f172

Please sign in to comment.