diff --git a/core/js/src/main/scala/cats/effect/internals/IOPlatform.scala b/core/js/src/main/scala/cats/effect/internals/IOPlatform.scala index ad9d29c68f..b610776d63 100644 --- a/core/js/src/main/scala/cats/effect/internals/IOPlatform.scala +++ b/core/js/src/main/scala/cats/effect/internals/IOPlatform.scala @@ -58,5 +58,9 @@ private[effect] object IOPlatform { * The default for JavaScript is 32, from which we substract 1 * as an optimization. */ - private[effect] final val fusionMaxStackDepth = 31 + final val fusionMaxStackDepth = 31 + + /** Returns `true` if the underlying platform is the JVM, + * `false` if it's JavaScript. */ + final val isJVM = false } diff --git a/core/js/src/main/scala/cats/effect/internals/TrampolineEC.scala b/core/js/src/main/scala/cats/effect/internals/TrampolineEC.scala new file mode 100644 index 0000000000..5e0d8be390 --- /dev/null +++ b/core/js/src/main/scala/cats/effect/internals/TrampolineEC.scala @@ -0,0 +1,102 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +import scala.annotation.tailrec +import scala.concurrent.ExecutionContext + +/** INTERNAL API — a [[scala.concurrent.ExecutionContext]] implementation + * that executes runnables immediately, on the current thread, + * by means of a trampoline implementation. + * + * Can be used in some cases to keep the asynchronous execution + * on the current thread, as an optimization, but be warned, + * you have to know what you're doing. + * + * This is the JavaScript-specific implementation, for which we + * don't need a `ThreadLocal` or Scala's `BlockingContext`. + */ +private[effect] final class TrampolineEC private (underlying: ExecutionContext) + extends ExecutionContext { + + // Starts with `null`! + private[this] var localTasks: List[Runnable] = _ + + override def execute(runnable: Runnable): Unit = + localTasks match { + case null => + // If we aren't in local mode yet, start local loop + localTasks = Nil + localRunLoop(runnable) + case some => + // If we are already in batching mode, add to stack + localTasks = runnable :: some + } + + + @tailrec private def localRunLoop(head: Runnable): Unit = { + try { head.run() } catch { + case NonFatal(ex) => + // Sending everything else to the underlying context + forkTheRest(null) + reportFailure(ex) + } + + localTasks match { + case null => () + case Nil => + localTasks = null + case h2 :: t2 => + localTasks = t2 + localRunLoop(h2) + } + } + + private def forkTheRest(newLocalTasks: Nil.type): Unit = { + val rest = localTasks + localTasks = newLocalTasks + + rest match { + case null | Nil => () + case head :: tail => + underlying.execute(new ResumeRun(head, tail)) + } + } + + override def reportFailure(t: Throwable): Unit = + underlying.reportFailure(t) + + private final class ResumeRun(head: Runnable, rest: List[Runnable]) + extends Runnable { + + def run(): Unit = { + localTasks = rest + localRunLoop(head) + } + } +} + +private[effect] object TrampolineEC { + /** [[TrampolineEC]] instance that executes everything + * immediately, on the current call stack. + */ + val immediate: TrampolineEC = + new TrampolineEC(new ExecutionContext { + def execute(r: Runnable): Unit = r.run() + def reportFailure(e: Throwable): Unit = throw e + }) +} \ No newline at end of file diff --git a/core/jvm/src/main/scala/cats/effect/internals/IOPlatform.scala b/core/jvm/src/main/scala/cats/effect/internals/IOPlatform.scala index 38949a7bcb..448c19b83f 100644 --- a/core/jvm/src/main/scala/cats/effect/internals/IOPlatform.scala +++ b/core/jvm/src/main/scala/cats/effect/internals/IOPlatform.scala @@ -116,11 +116,15 @@ private[effect] object IOPlatform { * ... * */ - private[effect] final val fusionMaxStackDepth = + final val fusionMaxStackDepth = Option(System.getProperty("cats.effect.fusionMaxStackDepth", "")) .filter(s => s != null && s.nonEmpty) .flatMap(s => Try(s.toInt).toOption) .filter(_ > 0) .map(_ - 1) .getOrElse(127) + + /** Returns `true` if the underlying platform is the JVM, + * `false` if it's JavaScript. */ + final val isJVM = true } diff --git a/core/jvm/src/main/scala/cats/effect/internals/TrampolineEC.scala b/core/jvm/src/main/scala/cats/effect/internals/TrampolineEC.scala new file mode 100644 index 0000000000..2a249a6033 --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/internals/TrampolineEC.scala @@ -0,0 +1,122 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +import scala.annotation.tailrec +import scala.concurrent.{BlockContext, CanAwait, ExecutionContext} + +/** INTERNAL API — a [[scala.concurrent.ExecutionContext]] implementation + * that executes runnables immediately, on the current thread, + * by means of a trampoline implementation. + * + * Can be used in some cases to keep the asynchronous execution + * on the current thread, as an optimization, but be warned, + * you have to know what you're doing. + * + * This is the JVM-specific implementation, for which we + * need a `ThreadLocal` and Scala's `BlockingContext`. + */ +private[effect] final class TrampolineEC private (underlying: ExecutionContext) + extends ExecutionContext { + + private[this] val localTasks = new ThreadLocal[List[Runnable]]() + + private[this] val trampolineContext: BlockContext = + new BlockContext { + def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + // In case of blocking, execute all scheduled local tasks on + // a separate thread, otherwise we could end up with a dead-lock + forkTheRest(Nil) + thunk + } + } + + override def execute(runnable: Runnable): Unit = + localTasks.get match { + case null => + // If we aren't in local mode yet, start local loop + localTasks.set(Nil) + BlockContext.withBlockContext(trampolineContext) { + localRunLoop(runnable) + } + case some => + // If we are already in batching mode, add to stack + localTasks.set(runnable :: some) + } + + @tailrec private def localRunLoop(head: Runnable): Unit = { + try { head.run() } catch { + case NonFatal(ex) => + // Sending everything else to the underlying context + forkTheRest(null) + reportFailure(ex) + } + + localTasks.get() match { + case null => () + case Nil => + localTasks.set(null) + case h2 :: t2 => + localTasks.set(t2) + localRunLoop(h2) + } + } + + private def forkTheRest(newLocalTasks: Nil.type): Unit = { + val rest = localTasks.get() + localTasks.set(newLocalTasks) + + rest match { + case null | Nil => () + case head :: tail => + underlying.execute(new ResumeRun(head, tail)) + } + } + + override def reportFailure(t: Throwable): Unit = + underlying.reportFailure(t) + + private final class ResumeRun(head: Runnable, rest: List[Runnable]) + extends Runnable { + + def run(): Unit = { + localTasks.set(rest) + localRunLoop(head) + } + } +} + +private[effect] object TrampolineEC { + /** [[TrampolineEC]] instance that executes everything + * immediately, on the current thread. + * + * Implementation notes: + * + * - if too many `blocking` operations are chained, at some point + * the implementation will trigger a stack overflow error + * - `reportError` re-throws the exception in the hope that it + * will get caught and reported by the underlying thread-pool, + * because there's nowhere it could report that error safely + * (i.e. `System.err` might be routed to `/dev/null` and we'd + * have no way to override it) + */ + val immediate: TrampolineEC = + new TrampolineEC(new ExecutionContext { + def execute(r: Runnable): Unit = r.run() + def reportFailure(e: Throwable): Unit = throw e + }) +} \ No newline at end of file diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 7bd5d0af2f..2fe26589b7 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -17,9 +17,11 @@ package cats package effect +import cats.arrow.FunctionK import cats.effect.internals.IOFrame.ErrorHandler -import cats.effect.internals.{IOFrame, IOPlatform, IORunLoop, NonFatal} +import cats.effect.internals._ import cats.effect.internals.IOPlatform.fusionMaxStackDepth + import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.duration._ @@ -283,8 +285,27 @@ sealed abstract class IO[+A] { } } -private[effect] abstract class IOLowPriorityInstances { +private[effect] abstract class IOParallelNewtype { + /** Newtype encoding for an `IO` datatype that has a `cats.Applicative` + * capable of doing parallel processing in `ap` and `map2`, needed + * for implementing `cats.Parallel`. + * + * Helpers are provided for converting back and forth in `Par.apply` + * for wrapping any `IO` value and `Par.unwrap` for unwrapping. + * + * The encoding is based on the "newtypes" project by + * Alexander Konovalov, chosen because it's devoid of boxing issues and + * a good choice until opaque types will land in Scala. + */ + type Par[+A] = Par.Type[A] + + /** Newtype encoding, see the [[IO.Par]] type alias + * for more details. + */ + object Par extends IONewtype +} +private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype { private[effect] class IOSemigroup[A: Semigroup] extends Semigroup[IO[A]] { def combine(ioa1: IO[A], ioa2: IO[A]) = ioa1.flatMap(a1 => ioa2.map(a2 => Semigroup[A].combine(a1, a2))) @@ -295,6 +316,24 @@ private[effect] abstract class IOLowPriorityInstances { private[effect] abstract class IOInstances extends IOLowPriorityInstances { + implicit val parApplicative: Applicative[IO.Par] = new Applicative[IO.Par] { + import IO.Par.unwrap + import IO.Par.{apply => par} + + override def pure[A](x: A): IO.Par[A] = + par(IO.pure(x)) + override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] = + par(IOParMap(unwrap(fa), unwrap(fb))(f)) + override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] = + map2(ff, fa)(_(_)) + override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] = + map2(fa, fb)((_, _)) + override def map[A, B](fa: IO.Par[A])(f: A => B): IO.Par[B] = + par(unwrap(fa).map(f)) + override def unit: IO.Par[Unit] = + par(IO.unit) + } + implicit val ioEffect: Effect[IO] = new Effect[IO] { override def pure[A](a: A): IO[A] = IO.pure(a) @@ -332,6 +371,18 @@ private[effect] abstract class IOInstances extends IOLowPriorityInstances { } } + implicit val ioParallel: Parallel[IO, IO.Par] = + new Parallel[IO, IO.Par] { + override def applicative: Applicative[IO.Par] = + parApplicative + override def monad: Monad[IO] = + ioEffect + override val sequential: ~>[IO.Par, IO] = + new FunctionK[IO.Par, IO] { def apply[A](fa: IO.Par[A]): IO[A] = IO.Par.unwrap(fa) } + override val parallel: ~>[IO, IO.Par] = + new FunctionK[IO, IO.Par] { def apply[A](fa: IO[A]): IO.Par[A] = IO.Par(fa) } + } + implicit def ioMonoid[A: Monoid]: Monoid[IO[A]] = new IOSemigroup[A] with Monoid[IO[A]] { def empty = IO.pure(Monoid[A].empty) } diff --git a/core/shared/src/main/scala/cats/effect/internals/IONewtype.scala b/core/shared/src/main/scala/cats/effect/internals/IONewtype.scala new file mode 100644 index 0000000000..745f037905 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/internals/IONewtype.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +import cats.effect.IO + +/** INTERNAL API — Newtype encoding, used for defining `IO.Par`. + * + * The `IONewtype` abstract class indirection is needed for Scala 2.10, + * otherwise we could just define these types straight on the + * `IO.Par` companion object. In Scala 2.10 defining these types + * straight on the companion object yields an error like + * ''"only classes can have declared but undefined members"''. + * + * Inspired by + * [[https://github.com/alexknvl/newtypes alexknvl/newtypes]]. + */ +private[effect] abstract class IONewtype { self => + type Base + trait Tag extends Any + type Type[+A] <: Base with Tag + + def apply[A](fa: IO[A]): Type[A] = + fa.asInstanceOf[Type[A]] + + def unwrap[A](fa: Type[A]): IO[A] = + fa.asInstanceOf[IO[A]] +} diff --git a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala new file mode 100644 index 0000000000..9730c28e79 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +import java.util.concurrent.atomic.AtomicReference +import cats.effect.IO +import cats.effect.util.CompositeException +import scala.concurrent.ExecutionContext + +private[effect] object IOParMap { + type Attempt[A] = Either[Throwable, A] + type State[A, B] = Either[Attempt[A], Attempt[B]] + + def apply[A, B, C](fa: IO[A], fb: IO[B])(f: (A, B) => C): IO[C] = + IO.async { cb => + // For preventing stack-overflow errors; using a + // trampolined execution context, so no thread forks + implicit val ec: ExecutionContext = TrampolineEC.immediate + + // Light async boundary to prevent SO errors + ec.execute(new Runnable { + def run(): Unit = { + val state = new AtomicReference[State[A, B]]() + + def complete(ra: Attempt[A], rb: Attempt[B]): Unit = + // Second async boundary needed just before the callback + ec.execute(new Runnable { + def run(): Unit = ra match { + case Right(a) => + rb match { + case Right(b) => + cb(try Right(f(a, b)) catch { case NonFatal(e) => Left(e) }) + case error @ Left(_) => + cb(error.asInstanceOf[Left[Throwable, C]]) + } + case left @ Left(e1) => + rb match { + case Right(_) => + cb(left.asInstanceOf[Left[Throwable, C]]) + case Left(e2) => + // Signaling both errors + cb(Left(CompositeException(e1, e2))) + } + } + }) + + // First execution + fa.unsafeRunAsync { attemptA => + // Using Java 8 platform intrinsics + state.getAndSet(Left(attemptA)) match { + case null => () // wait for B + case Right(attemptB) => complete(attemptA, attemptB) + case left => + // $COVERAGE-OFF$ + throw new IllegalStateException(s"parMap: $left") + // $COVERAGE-ON$ + } + } + // Second execution + fb.unsafeRunAsync { attemptB => + // Using Java 8 platform intrinsics + state.getAndSet(Right(attemptB)) match { + case null => () // wait for A + case Left(attemptA) => complete(attemptA, attemptB) + case right => + // $COVERAGE-OFF$ + throw new IllegalStateException(s"parMap: $right") + // $COVERAGE-ON$ + } + } + } + }) + } +} diff --git a/core/shared/src/main/scala/cats/effect/util/CompositeException.scala b/core/shared/src/main/scala/cats/effect/util/CompositeException.scala new file mode 100644 index 0000000000..06325b18ff --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/util/CompositeException.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.util + +import cats.data.NonEmptyList + +/** A composite exception represents a list of exceptions + * caught from evaluating multiple independent actions + * and that need to be signaled together. + * + * Note the constructor doesn't allow wrapping anything less + * than two throwable references. + * + * Use [[cats.effect.util.CompositeException.apply apply]] + * for building composite exceptions. + */ +final class CompositeException(val head: Throwable, val tail: NonEmptyList[Throwable]) + extends RuntimeException( + s"Multiple exceptions were thrown (${1 + tail.size}), " + + s"first ${head.getClass.getName}: ${head.getMessage}") + with Serializable { + + /** Returns the set of all errors wrapped by this composite. */ + def all: NonEmptyList[Throwable] = + head :: tail +} + +object CompositeException { + /** Simple builder for [[CompositeException]]. */ + def apply(first: Throwable, second: Throwable, rest: List[Throwable] = Nil): CompositeException = + new CompositeException(first, NonEmptyList(second, rest)) +} \ No newline at end of file diff --git a/core/shared/src/test/scala/cats/effect/internals/TrampolineECTests.scala b/core/shared/src/test/scala/cats/effect/internals/TrampolineECTests.scala new file mode 100644 index 0000000000..1005364844 --- /dev/null +++ b/core/shared/src/test/scala/cats/effect/internals/TrampolineECTests.scala @@ -0,0 +1,114 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +import org.scalatest.{FunSuite, Matchers} +import cats.effect.internals.TrampolineEC.immediate + +import scala.concurrent.ExecutionContext +import cats.effect.internals.IOPlatform.isJVM + +import scala.collection.immutable.Queue + +class TrampolineECTests extends FunSuite with Matchers { + implicit val ec: ExecutionContext = immediate + + def executeImmediate(f: => Unit): Unit = + ec.execute(new Runnable { def run(): Unit = f }) + + test("execution should be immediate") { + var effect = 0 + + executeImmediate { + effect += 1 + executeImmediate { + effect += 2 + executeImmediate { + effect += 3 + } + } + } + + effect shouldEqual 1 + 2 + 3 + } + + test("concurrent execution") { + var effect = List.empty[Int] + + executeImmediate { + executeImmediate { effect = 1 :: effect } + executeImmediate { effect = 2 :: effect } + executeImmediate { effect = 3 :: effect } + } + + effect shouldEqual List(1, 2, 3) + } + + test("stack safety") { + var effect = 0 + def loop(n: Int, acc: Int): Unit = + executeImmediate { + if (n > 0) loop(n - 1, acc + 1) + else effect = acc + } + + val n = if (isJVM) 100000 else 5000 + loop(n, 0) + + effect shouldEqual n + } + + test("on blocking it should fork") { + assume(isJVM, "test relevant only for the JVM") + import concurrent.blocking + + var effects = Queue.empty[Int] + executeImmediate { + executeImmediate { effects = effects.enqueue(4) } + executeImmediate { effects = effects.enqueue(4) } + + effects = effects.enqueue(1) + blocking { effects = effects.enqueue(2) } + effects = effects.enqueue(3) + } + + effects shouldBe Queue(1, 4, 4, 2, 3) + } + + test("thrown exceptions should trigger scheduled execution") { + val dummy1 = new RuntimeException("dummy1") + val dummy2 = new RuntimeException("dummy1") + var effects = 0 + + try { + executeImmediate { + executeImmediate { effects += 1 } + executeImmediate { effects += 1 } + executeImmediate { + executeImmediate { effects += 1 } + executeImmediate { effects += 1 } + throw dummy2 + } + throw dummy1 + } + fail("should have thrown exception") + } catch { + case `dummy2` => + effects shouldBe 4 + } + } +} diff --git a/laws/js/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala b/laws/js/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala deleted file mode 100644 index a1be7f942e..0000000000 --- a/laws/js/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2017 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats -package effect -package laws -package discipline - -private[discipline] trait TestsPlatform { - final def isJVM = false -} diff --git a/laws/jvm/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala b/laws/jvm/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala deleted file mode 100644 index 94e40469d6..0000000000 --- a/laws/jvm/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2017 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats -package effect -package laws -package discipline - -private[discipline] trait TestsPlatform { - final def isJVM = true -} diff --git a/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala b/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala index 049b292573..a89b34ee5f 100644 --- a/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala +++ b/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala @@ -17,7 +17,7 @@ package cats.effect import org.scalatest._ - +import cats.syntax.all._ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ @@ -85,6 +85,18 @@ class IOJVMTests extends FunSuite with Matchers { assert(elapsed >= 100) } + test("parMap2 concurrently") { + import scala.concurrent.ExecutionContext.Implicits.global + + val io1 = IO.shift *> IO(1) + val io2 = IO.shift *> IO(2) + + for (_ <- 0 until 1000) { + val r = (io1, io2).parMapN(_ + _).unsafeRunSync + r shouldEqual 3 + } + } + // this is expected behavior // ...also it appears to fell the mighty Travis, so it's disabled for now /*test("fail to provide stack safety with repeated async suspensions") { diff --git a/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala b/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala index b1dd064bc5..779330f0d0 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala @@ -19,9 +19,9 @@ package effect package laws package discipline +import cats.effect.IO.Par import org.scalacheck._ import org.scalacheck.Arbitrary.{arbitrary => getArbitrary} - import scala.concurrent.Future import scala.util.Either @@ -29,6 +29,9 @@ object arbitrary { implicit def catsEffectLawsArbitraryForIO[A: Arbitrary: Cogen]: Arbitrary[IO[A]] = Arbitrary(Gen.delay(genIO[A])) + implicit def catsEffectLawsArbitraryForIOParallel[A: Arbitrary: Cogen]: Arbitrary[IO.Par[A]] = + Arbitrary(catsEffectLawsArbitraryForIO[A].arbitrary.map(Par.apply)) + def genIO[A: Arbitrary: Cogen]: Gen[IO[A]] = { Gen.frequency( 5 -> genPure[A], @@ -88,5 +91,5 @@ object arbitrary { } yield ioa.map(f1).map(f2) implicit def catsEffectLawsCogenForIO[A](implicit cgfa: Cogen[Future[A]]): Cogen[IO[A]] = - cgfa.contramap((ioa: IO[A]) => ioa.unsafeToFuture) + cgfa.contramap((ioa: IO[A]) => ioa.unsafeToFuture()) } diff --git a/laws/shared/src/main/scala/cats/effect/laws/discipline/SyncTests.scala b/laws/shared/src/main/scala/cats/effect/laws/discipline/SyncTests.scala index 86989c29c0..04e6dd3f0d 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/discipline/SyncTests.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/discipline/SyncTests.scala @@ -20,12 +20,13 @@ package laws package discipline import cats.data._ +import cats.effect.internals.IOPlatform.isJVM import cats.laws.discipline._ import cats.laws.discipline.SemigroupalTests.Isomorphisms - import org.scalacheck._, Prop.forAll -trait SyncTests[F[_]] extends MonadErrorTests[F, Throwable] with TestsPlatform { + +trait SyncTests[F[_]] extends MonadErrorTests[F, Throwable] { def laws: SyncLaws[F] def sync[A: Arbitrary: Eq, B: Arbitrary: Eq, C: Arbitrary: Eq]( diff --git a/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala b/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala index a37dedf088..d3f2f41875 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala @@ -17,7 +17,10 @@ package cats.effect.laws.util import cats.effect.IO +import cats.effect.util.CompositeException import cats.kernel.Eq + +import scala.annotation.tailrec import scala.concurrent.{ExecutionException, Future} import scala.util.{Failure, Success} @@ -39,6 +42,17 @@ trait TestInstances { eqFuture[A].eqv(x.unsafeToFuture(), y.unsafeToFuture()) } + /** + * Defines equality for `IO.Par` references that can + * get interpreted by means of a [[TestContext]]. + */ + implicit def eqIOPar[A](implicit A: Eq[A], ec: TestContext): Eq[IO.Par[A]] = + new Eq[IO.Par[A]] { + import IO.Par.unwrap + def eqv(x: IO.Par[A], y: IO.Par[A]): Boolean = + eqFuture[A].eqv(unwrap(x).unsafeToFuture(), unwrap(y).unsafeToFuture()) + } + /** * Defines equality for `Future` references that can * get interpreted by means of a [[TestContext]]. @@ -69,12 +83,15 @@ trait TestInstances { // Unwraps exceptions that got caught by Future's implementation // and that got wrapped in ExecutionException (`Future(throw ex)`) - def extractEx(ex: Throwable): String = { - var ref = ex - while (ref.isInstanceOf[ExecutionException] && ref.getCause != null) { - ref = ref.getCause + @tailrec def extractEx(ex: Throwable): String = { + ex match { + case e: ExecutionException if e.getCause != null => + extractEx(e.getCause) + case e: CompositeException => + extractEx(e.head) + case _ => + s"${ex.getClass.getName}: ${ex.getMessage}" } - s"${ref.getClass.getName}: ${ref.getMessage}" } } } diff --git a/laws/shared/src/test/scala/cats/effect/IOAsyncTests.scala b/laws/shared/src/test/scala/cats/effect/IOAsyncTests.scala index f768647a03..cdd8eb4458 100644 --- a/laws/shared/src/test/scala/cats/effect/IOAsyncTests.scala +++ b/laws/shared/src/test/scala/cats/effect/IOAsyncTests.scala @@ -18,7 +18,6 @@ package cats.effect import org.scalactic.source.Position import org.scalatest.{Assertion, AsyncFunSuite, Matchers} - import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success, Try} diff --git a/laws/shared/src/test/scala/cats/effect/IOTests.scala b/laws/shared/src/test/scala/cats/effect/IOTests.scala index 7140fd57ef..7c31b882d5 100644 --- a/laws/shared/src/test/scala/cats/effect/IOTests.scala +++ b/laws/shared/src/test/scala/cats/effect/IOTests.scala @@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger import cats.effect.internals.IOPlatform import cats.effect.laws.discipline.EffectTests import cats.effect.laws.discipline.arbitrary._ +import cats.effect.laws.util.TestContext +import cats.effect.util.CompositeException import cats.implicits._ import cats.kernel.laws.discipline.MonoidTests import cats.laws._ @@ -38,6 +40,13 @@ class IOTests extends BaseTestsSuite { checkAllAsync("IO", implicit ec => MonoidTests[IO[Int]].monoid) checkAllAsync("IO", implicit ec => SemigroupKTests[IO].semigroupK[Int]) + checkAllAsync("IO.Par", implicit ec => ApplicativeTests[IO.Par].applicative[Int, Int, Int]) + checkAllAsync("IO", implicit ec => ParallelTests[IO, IO.Par].parallel[Int, Int]) + + test("IO.Par's applicative instance is different") { + implicitly[Applicative[IO]] shouldNot be(implicitly[Applicative[IO.Par]]) + } + test("defer evaluation until run") { var run = false val ioa = IO { run = true } @@ -412,6 +421,63 @@ class IOTests extends BaseTestsSuite { io.unsafeRunSync() shouldEqual max * 10000 } + + test("parMap2 for successful values") { + implicit val ec = TestContext() + val io1 = IO.shift *> IO.pure(1) + val io2 = IO.shift *> IO.pure(2) + + val io3 = (io1, io2).parMapN(_ + _) + val f = io3.unsafeToFuture() + ec.tick() + f.value shouldEqual Some(Success(3)) + } + + test("parMap2 can fail for one") { + implicit val ec = TestContext() + val dummy = new RuntimeException("dummy") + val io1 = IO.shift *> IO.pure(1) + val io2 = IO.shift *> IO.raiseError[Int](dummy) + + val io3 = (io1, io2).parMapN(_ + _) + val f1 = io3.unsafeToFuture() + + ec.tick() + f1.value shouldEqual Some(Failure(dummy)) + + val io4 = (io2, io1).parMapN(_ + _) + val f2 = io4.unsafeToFuture() + + ec.tick() + f2.value shouldEqual Some(Failure(dummy)) + } + + test("parMap2 can fail for both") { + implicit val ec = TestContext() + val dummy1 = new RuntimeException("dummy1") + val dummy2 = new RuntimeException("dummy2") + + val io1 = IO.shift *> IO.raiseError[Int](dummy1) + val io2 = IO.shift *> IO.raiseError[Int](dummy2) + + val io3 = (io1, io2).parMapN(_ + _) + val f1 = io3.unsafeToFuture() + ec.tick() + + val errors = f1.value.collect { + case Failure(ref: CompositeException) => ref.all.toList + } + + errors shouldBe Some(List(dummy1, dummy2)) + } + + test("parMap2 is stack safe") { + val count = if (IOPlatform.isJVM) 100000 else 5000 + val io = (0 until count).foldLeft(IO(0))((acc, e) => (acc, IO(e)).parMapN(_ + _)) + + val f = io.unsafeToFuture() + f.value shouldEqual Some(Success(count * (count - 1) / 2)) + } } object IOTests {