From b801b569b150cd73f934430e5aedfe128a216512 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Fri, 12 Jan 2018 19:52:25 +0200 Subject: [PATCH 01/14] Add trampolined execution context --- .../cats/effect/internals/IOPlatform.scala | 6 +- .../cats/effect/internals/TrampolineEC.scala | 102 +++++++++++++++ .../cats/effect/internals/IOPlatform.scala | 6 +- .../cats/effect/internals/TrampolineEC.scala | 122 ++++++++++++++++++ .../cats/effect/internals/IOParMap.scala | 13 +- .../effect/internals/TrampolineECTests.scala | 114 ++++++++++++++++ .../test/scala/cats/effect/IOJSTests.scala | 52 -------- .../laws/discipline/TestsPlatform.scala | 24 ---- .../test/scala/cats/effect/IOJVMTests.scala | 99 -------------- .../effect/laws/discipline/SyncTests.scala | 5 +- 10 files changed, 358 insertions(+), 185 deletions(-) create mode 100644 core/js/src/main/scala/cats/effect/internals/TrampolineEC.scala create mode 100644 core/jvm/src/main/scala/cats/effect/internals/TrampolineEC.scala rename laws/js/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala => core/shared/src/main/scala/cats/effect/internals/IOParMap.scala (76%) create mode 100644 core/shared/src/test/scala/cats/effect/internals/TrampolineECTests.scala delete mode 100644 laws/js/src/test/scala/cats/effect/IOJSTests.scala delete mode 100644 laws/jvm/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala delete mode 100644 laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala diff --git a/core/js/src/main/scala/cats/effect/internals/IOPlatform.scala b/core/js/src/main/scala/cats/effect/internals/IOPlatform.scala index ad9d29c68f..b610776d63 100644 --- a/core/js/src/main/scala/cats/effect/internals/IOPlatform.scala +++ b/core/js/src/main/scala/cats/effect/internals/IOPlatform.scala @@ -58,5 +58,9 @@ private[effect] object IOPlatform { * The default for JavaScript is 32, from which we substract 1 * as an optimization. */ - private[effect] final val fusionMaxStackDepth = 31 + final val fusionMaxStackDepth = 31 + + /** Returns `true` if the underlying platform is the JVM, + * `false` if it's JavaScript. */ + final val isJVM = false } diff --git a/core/js/src/main/scala/cats/effect/internals/TrampolineEC.scala b/core/js/src/main/scala/cats/effect/internals/TrampolineEC.scala new file mode 100644 index 0000000000..5e0d8be390 --- /dev/null +++ b/core/js/src/main/scala/cats/effect/internals/TrampolineEC.scala @@ -0,0 +1,102 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +import scala.annotation.tailrec +import scala.concurrent.ExecutionContext + +/** INTERNAL API — a [[scala.concurrent.ExecutionContext]] implementation + * that executes runnables immediately, on the current thread, + * by means of a trampoline implementation. + * + * Can be used in some cases to keep the asynchronous execution + * on the current thread, as an optimization, but be warned, + * you have to know what you're doing. + * + * This is the JavaScript-specific implementation, for which we + * don't need a `ThreadLocal` or Scala's `BlockingContext`. + */ +private[effect] final class TrampolineEC private (underlying: ExecutionContext) + extends ExecutionContext { + + // Starts with `null`! + private[this] var localTasks: List[Runnable] = _ + + override def execute(runnable: Runnable): Unit = + localTasks match { + case null => + // If we aren't in local mode yet, start local loop + localTasks = Nil + localRunLoop(runnable) + case some => + // If we are already in batching mode, add to stack + localTasks = runnable :: some + } + + + @tailrec private def localRunLoop(head: Runnable): Unit = { + try { head.run() } catch { + case NonFatal(ex) => + // Sending everything else to the underlying context + forkTheRest(null) + reportFailure(ex) + } + + localTasks match { + case null => () + case Nil => + localTasks = null + case h2 :: t2 => + localTasks = t2 + localRunLoop(h2) + } + } + + private def forkTheRest(newLocalTasks: Nil.type): Unit = { + val rest = localTasks + localTasks = newLocalTasks + + rest match { + case null | Nil => () + case head :: tail => + underlying.execute(new ResumeRun(head, tail)) + } + } + + override def reportFailure(t: Throwable): Unit = + underlying.reportFailure(t) + + private final class ResumeRun(head: Runnable, rest: List[Runnable]) + extends Runnable { + + def run(): Unit = { + localTasks = rest + localRunLoop(head) + } + } +} + +private[effect] object TrampolineEC { + /** [[TrampolineEC]] instance that executes everything + * immediately, on the current call stack. + */ + val immediate: TrampolineEC = + new TrampolineEC(new ExecutionContext { + def execute(r: Runnable): Unit = r.run() + def reportFailure(e: Throwable): Unit = throw e + }) +} \ No newline at end of file diff --git a/core/jvm/src/main/scala/cats/effect/internals/IOPlatform.scala b/core/jvm/src/main/scala/cats/effect/internals/IOPlatform.scala index 38949a7bcb..448c19b83f 100644 --- a/core/jvm/src/main/scala/cats/effect/internals/IOPlatform.scala +++ b/core/jvm/src/main/scala/cats/effect/internals/IOPlatform.scala @@ -116,11 +116,15 @@ private[effect] object IOPlatform { * ... * */ - private[effect] final val fusionMaxStackDepth = + final val fusionMaxStackDepth = Option(System.getProperty("cats.effect.fusionMaxStackDepth", "")) .filter(s => s != null && s.nonEmpty) .flatMap(s => Try(s.toInt).toOption) .filter(_ > 0) .map(_ - 1) .getOrElse(127) + + /** Returns `true` if the underlying platform is the JVM, + * `false` if it's JavaScript. */ + final val isJVM = true } diff --git a/core/jvm/src/main/scala/cats/effect/internals/TrampolineEC.scala b/core/jvm/src/main/scala/cats/effect/internals/TrampolineEC.scala new file mode 100644 index 0000000000..2a249a6033 --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/internals/TrampolineEC.scala @@ -0,0 +1,122 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +import scala.annotation.tailrec +import scala.concurrent.{BlockContext, CanAwait, ExecutionContext} + +/** INTERNAL API — a [[scala.concurrent.ExecutionContext]] implementation + * that executes runnables immediately, on the current thread, + * by means of a trampoline implementation. + * + * Can be used in some cases to keep the asynchronous execution + * on the current thread, as an optimization, but be warned, + * you have to know what you're doing. + * + * This is the JVM-specific implementation, for which we + * need a `ThreadLocal` and Scala's `BlockingContext`. + */ +private[effect] final class TrampolineEC private (underlying: ExecutionContext) + extends ExecutionContext { + + private[this] val localTasks = new ThreadLocal[List[Runnable]]() + + private[this] val trampolineContext: BlockContext = + new BlockContext { + def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + // In case of blocking, execute all scheduled local tasks on + // a separate thread, otherwise we could end up with a dead-lock + forkTheRest(Nil) + thunk + } + } + + override def execute(runnable: Runnable): Unit = + localTasks.get match { + case null => + // If we aren't in local mode yet, start local loop + localTasks.set(Nil) + BlockContext.withBlockContext(trampolineContext) { + localRunLoop(runnable) + } + case some => + // If we are already in batching mode, add to stack + localTasks.set(runnable :: some) + } + + @tailrec private def localRunLoop(head: Runnable): Unit = { + try { head.run() } catch { + case NonFatal(ex) => + // Sending everything else to the underlying context + forkTheRest(null) + reportFailure(ex) + } + + localTasks.get() match { + case null => () + case Nil => + localTasks.set(null) + case h2 :: t2 => + localTasks.set(t2) + localRunLoop(h2) + } + } + + private def forkTheRest(newLocalTasks: Nil.type): Unit = { + val rest = localTasks.get() + localTasks.set(newLocalTasks) + + rest match { + case null | Nil => () + case head :: tail => + underlying.execute(new ResumeRun(head, tail)) + } + } + + override def reportFailure(t: Throwable): Unit = + underlying.reportFailure(t) + + private final class ResumeRun(head: Runnable, rest: List[Runnable]) + extends Runnable { + + def run(): Unit = { + localTasks.set(rest) + localRunLoop(head) + } + } +} + +private[effect] object TrampolineEC { + /** [[TrampolineEC]] instance that executes everything + * immediately, on the current thread. + * + * Implementation notes: + * + * - if too many `blocking` operations are chained, at some point + * the implementation will trigger a stack overflow error + * - `reportError` re-throws the exception in the hope that it + * will get caught and reported by the underlying thread-pool, + * because there's nowhere it could report that error safely + * (i.e. `System.err` might be routed to `/dev/null` and we'd + * have no way to override it) + */ + val immediate: TrampolineEC = + new TrampolineEC(new ExecutionContext { + def execute(r: Runnable): Unit = r.run() + def reportFailure(e: Throwable): Unit = throw e + }) +} \ No newline at end of file diff --git a/laws/js/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala similarity index 76% rename from laws/js/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala rename to core/shared/src/main/scala/cats/effect/internals/IOParMap.scala index a1be7f942e..fa5fd606a7 100644 --- a/laws/js/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala @@ -14,11 +14,12 @@ * limitations under the License. */ -package cats -package effect -package laws -package discipline +package cats.effect.internals -private[discipline] trait TestsPlatform { - final def isJVM = false +import cats.effect.IO + +private[effect] object IOParMap { + + def apply[A, B, C](fa: IO[A], fb: IO[B])(f: (A, B) => C): IO[C] = + IO.async { cb => } } diff --git a/core/shared/src/test/scala/cats/effect/internals/TrampolineECTests.scala b/core/shared/src/test/scala/cats/effect/internals/TrampolineECTests.scala new file mode 100644 index 0000000000..1005364844 --- /dev/null +++ b/core/shared/src/test/scala/cats/effect/internals/TrampolineECTests.scala @@ -0,0 +1,114 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +import org.scalatest.{FunSuite, Matchers} +import cats.effect.internals.TrampolineEC.immediate + +import scala.concurrent.ExecutionContext +import cats.effect.internals.IOPlatform.isJVM + +import scala.collection.immutable.Queue + +class TrampolineECTests extends FunSuite with Matchers { + implicit val ec: ExecutionContext = immediate + + def executeImmediate(f: => Unit): Unit = + ec.execute(new Runnable { def run(): Unit = f }) + + test("execution should be immediate") { + var effect = 0 + + executeImmediate { + effect += 1 + executeImmediate { + effect += 2 + executeImmediate { + effect += 3 + } + } + } + + effect shouldEqual 1 + 2 + 3 + } + + test("concurrent execution") { + var effect = List.empty[Int] + + executeImmediate { + executeImmediate { effect = 1 :: effect } + executeImmediate { effect = 2 :: effect } + executeImmediate { effect = 3 :: effect } + } + + effect shouldEqual List(1, 2, 3) + } + + test("stack safety") { + var effect = 0 + def loop(n: Int, acc: Int): Unit = + executeImmediate { + if (n > 0) loop(n - 1, acc + 1) + else effect = acc + } + + val n = if (isJVM) 100000 else 5000 + loop(n, 0) + + effect shouldEqual n + } + + test("on blocking it should fork") { + assume(isJVM, "test relevant only for the JVM") + import concurrent.blocking + + var effects = Queue.empty[Int] + executeImmediate { + executeImmediate { effects = effects.enqueue(4) } + executeImmediate { effects = effects.enqueue(4) } + + effects = effects.enqueue(1) + blocking { effects = effects.enqueue(2) } + effects = effects.enqueue(3) + } + + effects shouldBe Queue(1, 4, 4, 2, 3) + } + + test("thrown exceptions should trigger scheduled execution") { + val dummy1 = new RuntimeException("dummy1") + val dummy2 = new RuntimeException("dummy1") + var effects = 0 + + try { + executeImmediate { + executeImmediate { effects += 1 } + executeImmediate { effects += 1 } + executeImmediate { + executeImmediate { effects += 1 } + executeImmediate { effects += 1 } + throw dummy2 + } + throw dummy1 + } + fail("should have thrown exception") + } catch { + case `dummy2` => + effects shouldBe 4 + } + } +} diff --git a/laws/js/src/test/scala/cats/effect/IOJSTests.scala b/laws/js/src/test/scala/cats/effect/IOJSTests.scala deleted file mode 100644 index fc0884916c..0000000000 --- a/laws/js/src/test/scala/cats/effect/IOJSTests.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2017 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect - -import org.scalatest.{AsyncFunSuite, Matchers} - -import scala.concurrent.duration.{FiniteDuration, _} -import scala.concurrent.{ExecutionContext, Future} -import scala.scalajs.js.timers.setTimeout - -class IOJSTests extends AsyncFunSuite with Matchers { - implicit override def executionContext = - ExecutionContext.global - - def delayed[A](duration: FiniteDuration)(f: => A): IO[A] = - IO.async { callback => - setTimeout(duration.toMillis.toDouble)(callback(Right(f))) - } - - test("unsafeToFuture works") { - delayed(100.millis)(10).unsafeToFuture().map { r => - r shouldEqual 10 - } - } - - test("unsafeRunSync is unsupported for async stuff") { - Future { - try { - delayed(100.millis)(10).unsafeRunSync() - fail("Expected UnsupportedOperationException") - } - catch { - case _: UnsupportedOperationException => - succeed - } - } - } -} diff --git a/laws/jvm/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala b/laws/jvm/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala deleted file mode 100644 index 94e40469d6..0000000000 --- a/laws/jvm/src/main/scala/cats/effect/laws/discipline/TestsPlatform.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2017 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats -package effect -package laws -package discipline - -private[discipline] trait TestsPlatform { - final def isJVM = true -} diff --git a/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala b/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala deleted file mode 100644 index 049b292573..0000000000 --- a/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2017 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect - -import org.scalatest._ - -import scala.concurrent.ExecutionContext -import scala.concurrent.duration._ - -class IOJVMTests extends FunSuite with Matchers { - val ThreadName = "test-thread" - - val TestEC = new ExecutionContext { - def execute(r: Runnable): Unit = { - val th = new Thread(r) - th.setName(ThreadName) - th.start() - } - - def reportFailure(cause: Throwable): Unit = - throw cause - } - - test("shift contiguous prefix and suffix, but not interfix") { - val name: IO[String] = IO { Thread.currentThread().getName } - - val aname: IO[String] = IO async { cb => - new Thread { - start() - override def run() = - cb(Right(Thread.currentThread().getName)) - } - } - - val test = for { - _ <- IO.shift(TestEC) - n1 <- name - n2 <- name - n3 <- aname - n4 <- name - _ <- IO.shift(TestEC) - n5 <- name - n6 <- name - } yield (n1, n2, n3, n4, n5, n6) - - val (n1, n2, n3, n4, n5, n6) = test.unsafeRunSync() - - n1 shouldEqual ThreadName - n2 shouldEqual ThreadName - n3 should not equal ThreadName - n4 should not equal ThreadName - n5 shouldEqual ThreadName - n6 shouldEqual ThreadName - } - - test("unsafeRunTimed(Duration.Undefined) throws exception") { - val never = IO.async[Int](_ => ()) - - intercept[IllegalArgumentException] { - never.unsafeRunTimed(Duration.Undefined) - } - } - - test("unsafeRunTimed times-out on unending IO") { - val never = IO.async[Int](_ => ()) - val start = System.currentTimeMillis() - val received = never.unsafeRunTimed(100.millis) - val elapsed = System.currentTimeMillis() - start - - received shouldEqual None - assert(elapsed >= 100) - } - - // this is expected behavior - // ...also it appears to fell the mighty Travis, so it's disabled for now - /*test("fail to provide stack safety with repeated async suspensions") { - val result = (0 until 10000).foldLeft(IO(0)) { (acc, i) => - acc.flatMap(n => IO.async[Int](_(Right(n + 1)))) - } - - intercept[StackOverflowError] { - result.unsafeRunAsync(_ => ()) - } - }*/ -} diff --git a/laws/shared/src/main/scala/cats/effect/laws/discipline/SyncTests.scala b/laws/shared/src/main/scala/cats/effect/laws/discipline/SyncTests.scala index 86989c29c0..04e6dd3f0d 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/discipline/SyncTests.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/discipline/SyncTests.scala @@ -20,12 +20,13 @@ package laws package discipline import cats.data._ +import cats.effect.internals.IOPlatform.isJVM import cats.laws.discipline._ import cats.laws.discipline.SemigroupalTests.Isomorphisms - import org.scalacheck._, Prop.forAll -trait SyncTests[F[_]] extends MonadErrorTests[F, Throwable] with TestsPlatform { + +trait SyncTests[F[_]] extends MonadErrorTests[F, Throwable] { def laws: SyncLaws[F] def sync[A: Arbitrary: Eq, B: Arbitrary: Eq, C: Arbitrary: Eq]( From eecdf49d030cc099ef16082b902d9919c84be18f Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Fri, 12 Jan 2018 20:08:44 +0200 Subject: [PATCH 02/14] Add internal `parMap2` implementation --- .../cats/effect/internals/IOParMap.scala | 56 ++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala index fa5fd606a7..505d12c750 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala @@ -16,10 +16,64 @@ package cats.effect.internals +import java.util.concurrent.atomic.AtomicReference import cats.effect.IO +import scala.concurrent.ExecutionContext private[effect] object IOParMap { + type Attempt[A] = Either[Throwable, A] + type State[A, B] = Either[Attempt[A], Attempt[B]] def apply[A, B, C](fa: IO[A], fb: IO[B])(f: (A, B) => C): IO[C] = - IO.async { cb => } + IO.async { cb => + // For preventing stack-overflow errors; using a + // trampolined execution context, so no thread forks + private implicit val ec: ExecutionContext = TrampolineEC.immediate + + // Light async boundary to prevent SO errors + ec.execute(new Runnable { + def run(): Unit = { + val state = new AtomicReference[State[A, B]]() + + def complete(ra: Attempt[A], rb: Attempt[B]): Unit = + // Second async boundary needed just before the callback + ec.execute(new Runnable { + def run(): Unit = ra match { + case Right(a) => + rb match { + case Right(b) => + cb(try Right(f(a, b)) catch { case NonFatal(e) => Left(e) }) + case error @ Left(_) => + cb(error) + } + case error @ Left(_) => + cb(error) + rb match { + case Left(error2) => throw error2 + case _ => () + } + } + }) + + // First execution + fa.unsafeRunAsync { attemptA => + // Using Java 8 platform intrinsics + state.getAndSet(Left(attemptA)) match { + case null => () // wait for B + case Right(attemptB) => complete(attemptA, attemptB) + case left => throw new IllegalStateException(s"parMap: $left") + } + } + // Second execution + fb.unsafeRunAsync { attemptB => + // Using Java 8 platform intrinsics + state.getAndSet(Right(attemptB)) match { + case null => () // wait for B + case Left(attemptA) => complete(attemptA, attemptB) + case right => throw new IllegalStateException(s"parMap: $right") + } + } + } + }) + } } From 1c1afb4ef1941b4e8bed82b8ff30ac04f11cd09e Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Fri, 12 Jan 2018 20:20:01 +0200 Subject: [PATCH 03/14] Oops, deleted tests by mistake --- .../test/scala/cats/effect/IOJSTests.scala | 52 ++++++++++ .../test/scala/cats/effect/IOJVMTests.scala | 99 +++++++++++++++++++ 2 files changed, 151 insertions(+) create mode 100644 laws/js/src/test/scala/cats/effect/IOJSTests.scala create mode 100644 laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala diff --git a/laws/js/src/test/scala/cats/effect/IOJSTests.scala b/laws/js/src/test/scala/cats/effect/IOJSTests.scala new file mode 100644 index 0000000000..fc0884916c --- /dev/null +++ b/laws/js/src/test/scala/cats/effect/IOJSTests.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import org.scalatest.{AsyncFunSuite, Matchers} + +import scala.concurrent.duration.{FiniteDuration, _} +import scala.concurrent.{ExecutionContext, Future} +import scala.scalajs.js.timers.setTimeout + +class IOJSTests extends AsyncFunSuite with Matchers { + implicit override def executionContext = + ExecutionContext.global + + def delayed[A](duration: FiniteDuration)(f: => A): IO[A] = + IO.async { callback => + setTimeout(duration.toMillis.toDouble)(callback(Right(f))) + } + + test("unsafeToFuture works") { + delayed(100.millis)(10).unsafeToFuture().map { r => + r shouldEqual 10 + } + } + + test("unsafeRunSync is unsupported for async stuff") { + Future { + try { + delayed(100.millis)(10).unsafeRunSync() + fail("Expected UnsupportedOperationException") + } + catch { + case _: UnsupportedOperationException => + succeed + } + } + } +} diff --git a/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala b/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala new file mode 100644 index 0000000000..049b292573 --- /dev/null +++ b/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala @@ -0,0 +1,99 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import org.scalatest._ + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + +class IOJVMTests extends FunSuite with Matchers { + val ThreadName = "test-thread" + + val TestEC = new ExecutionContext { + def execute(r: Runnable): Unit = { + val th = new Thread(r) + th.setName(ThreadName) + th.start() + } + + def reportFailure(cause: Throwable): Unit = + throw cause + } + + test("shift contiguous prefix and suffix, but not interfix") { + val name: IO[String] = IO { Thread.currentThread().getName } + + val aname: IO[String] = IO async { cb => + new Thread { + start() + override def run() = + cb(Right(Thread.currentThread().getName)) + } + } + + val test = for { + _ <- IO.shift(TestEC) + n1 <- name + n2 <- name + n3 <- aname + n4 <- name + _ <- IO.shift(TestEC) + n5 <- name + n6 <- name + } yield (n1, n2, n3, n4, n5, n6) + + val (n1, n2, n3, n4, n5, n6) = test.unsafeRunSync() + + n1 shouldEqual ThreadName + n2 shouldEqual ThreadName + n3 should not equal ThreadName + n4 should not equal ThreadName + n5 shouldEqual ThreadName + n6 shouldEqual ThreadName + } + + test("unsafeRunTimed(Duration.Undefined) throws exception") { + val never = IO.async[Int](_ => ()) + + intercept[IllegalArgumentException] { + never.unsafeRunTimed(Duration.Undefined) + } + } + + test("unsafeRunTimed times-out on unending IO") { + val never = IO.async[Int](_ => ()) + val start = System.currentTimeMillis() + val received = never.unsafeRunTimed(100.millis) + val elapsed = System.currentTimeMillis() - start + + received shouldEqual None + assert(elapsed >= 100) + } + + // this is expected behavior + // ...also it appears to fell the mighty Travis, so it's disabled for now + /*test("fail to provide stack safety with repeated async suspensions") { + val result = (0 until 10000).foldLeft(IO(0)) { (acc, i) => + acc.flatMap(n => IO.async[Int](_(Right(n + 1)))) + } + + intercept[StackOverflowError] { + result.unsafeRunAsync(_ => ()) + } + }*/ +} From 6f86cc949dd39900efa02fd763dd5276e1281324 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 16 Jan 2018 14:44:52 +0200 Subject: [PATCH 04/14] Add newtype and Parallel instance --- .../src/main/scala/cats/effect/IO.scala | 66 ++++++++++++++++++- .../cats/effect/internals/IOParMap.scala | 25 ++++--- .../cats/effect/util/CompositeException.scala | 37 +++++++++++ .../test/scala/cats/effect/IOJVMTests.scala | 14 +++- .../effect/laws/discipline/Arbitrary.scala | 3 + .../cats/effect/laws/util/TestInstances.scala | 10 +++ .../test/scala/cats/effect/IOAsyncTests.scala | 1 - .../src/test/scala/cats/effect/IOTests.scala | 61 +++++++++++++++++ 8 files changed, 205 insertions(+), 12 deletions(-) create mode 100644 core/shared/src/main/scala/cats/effect/util/CompositeException.scala diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 7bd5d0af2f..6cf4f2f420 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -17,9 +17,11 @@ package cats package effect +import cats.arrow.FunctionK import cats.effect.internals.IOFrame.ErrorHandler -import cats.effect.internals.{IOFrame, IOPlatform, IORunLoop, NonFatal} +import cats.effect.internals._ import cats.effect.internals.IOPlatform.fusionMaxStackDepth + import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.duration._ @@ -283,8 +285,31 @@ sealed abstract class IO[+A] { } } -private[effect] abstract class IOLowPriorityInstances { +private[effect] abstract class IOParallelNewtype { + /** Newtype encoding for an `IO` datatype that has a [[cats.Applicative]] + * capable of doing parallel processing in `ap` and `map2`, needed + * for implementing [[cats.Parallel]]. + * + * Helpers are provided as extension methods for converting back and forth + * in [[IO.IOExtensions.toPar .toPar]] and [[IO.ParExtensions.toIO .toIO]]. + * + * The encoding is inspired based on the "newtypes" project by + * Alexander Konovalov, chosen because it's devoid of boxing issues and + * a good choice until opaque types will land in Scala. + */ + type Par[+A] = Par.Type[A] + /** Newtype encoding, see the [[IO.Par]] type alias + * for more details. + */ + object Par { + type Base + trait Tag extends Any + type Type[+A] <: Base with Tag + } +} + +private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype { private[effect] class IOSemigroup[A: Semigroup] extends Semigroup[IO[A]] { def combine(ioa1: IO[A], ioa2: IO[A]) = ioa1.flatMap(a1 => ioa2.map(a2 => Semigroup[A].combine(a1, a2))) @@ -295,6 +320,21 @@ private[effect] abstract class IOLowPriorityInstances { private[effect] abstract class IOInstances extends IOLowPriorityInstances { + implicit val parApplicative: Applicative[IO.Par] = new Applicative[IO.Par] { + override def pure[A](x: A): IO.Par[A] = + IO.pure(x).toPar + override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] = + IOParMap(fa.toIO, fb.toIO)(f).toPar + override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] = + map2(ff, fa)(_(_)) + override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] = + map2(fa, fb)((_, _)) + override def map[A, B](fa: IO.Par[A])(f: A => B): IO.Par[B] = + fa.toIO.map(f).toPar + override def unit: IO.Par[Unit] = + IO.unit.toPar + } + implicit val ioEffect: Effect[IO] = new Effect[IO] { override def pure[A](a: A): IO[A] = IO.pure(a) @@ -332,6 +372,18 @@ private[effect] abstract class IOInstances extends IOLowPriorityInstances { } } + implicit val ioParallel: Parallel[IO, IO.Par] = + new Parallel[IO, IO.Par] { + override def applicative: Applicative[IO.Par] = + parApplicative + override def monad: Monad[IO] = + ioEffect + override def sequential: ~>[IO.Par, IO] = + new FunctionK[IO.Par, IO] { def apply[A](fa: IO.Par[A]): IO[A] = fa.toIO } + override def parallel: ~>[IO, IO.Par] = + new FunctionK[IO, IO.Par] { def apply[A](fa: IO[A]): IO.Par[A] = fa.toPar } + } + implicit def ioMonoid[A: Monoid]: Monoid[IO[A]] = new IOSemigroup[A] with Monoid[IO[A]] { def empty = IO.pure(Monoid[A].empty) } @@ -610,4 +662,14 @@ object IO extends IOInstances { override def recover(e: Throwable) = Pure(Left(e)) } + + implicit final class IOExtensions[+A](val self: IO[A]) extends AnyVal { + /** Wraps any `IO` value into an [[IO.Par]] type. */ + def toPar: IO.Par[A] = self.asInstanceOf[Par[A]] + } + + implicit final class ParExtensions[+A](val self: Par[A]) extends AnyVal { + /** Unwraps `IO.Par` values back to the base [[IO]] type. */ + def toIO: IO[A] = self.asInstanceOf[IO[A]] + } } diff --git a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala index 505d12c750..1abd7a63ee 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala @@ -18,6 +18,7 @@ package cats.effect.internals import java.util.concurrent.atomic.AtomicReference import cats.effect.IO +import cats.effect.util.CompositeException import scala.concurrent.ExecutionContext private[effect] object IOParMap { @@ -28,7 +29,7 @@ private[effect] object IOParMap { IO.async { cb => // For preventing stack-overflow errors; using a // trampolined execution context, so no thread forks - private implicit val ec: ExecutionContext = TrampolineEC.immediate + implicit val ec: ExecutionContext = TrampolineEC.immediate // Light async boundary to prevent SO errors ec.execute(new Runnable { @@ -44,13 +45,15 @@ private[effect] object IOParMap { case Right(b) => cb(try Right(f(a, b)) catch { case NonFatal(e) => Left(e) }) case error @ Left(_) => - cb(error) + cb(error.asInstanceOf[Left[Throwable, C]]) } - case error @ Left(_) => - cb(error) + case left @ Left(e1) => rb match { - case Left(error2) => throw error2 - case _ => () + case Right(_) => + cb(left.asInstanceOf[Left[Throwable, C]]) + case Left(e2) => + // Signaling both errors + cb(Left(new CompositeException(e1, e2))) } } }) @@ -61,7 +64,10 @@ private[effect] object IOParMap { state.getAndSet(Left(attemptA)) match { case null => () // wait for B case Right(attemptB) => complete(attemptA, attemptB) - case left => throw new IllegalStateException(s"parMap: $left") + case left => + // $COVERAGE-OFF$ + throw new IllegalStateException(s"parMap: $left") + // $COVERAGE-ON$ } } // Second execution @@ -70,7 +76,10 @@ private[effect] object IOParMap { state.getAndSet(Right(attemptB)) match { case null => () // wait for B case Left(attemptA) => complete(attemptA, attemptB) - case right => throw new IllegalStateException(s"parMap: $right") + case right => + // $COVERAGE-OFF$ + throw new IllegalStateException(s"parMap: $right") + // $COVERAGE-ON$ } } } diff --git a/core/shared/src/main/scala/cats/effect/util/CompositeException.scala b/core/shared/src/main/scala/cats/effect/util/CompositeException.scala new file mode 100644 index 0000000000..1e60723309 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/util/CompositeException.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.util + +/** A composite exception represents a list of exceptions + * caught from evaluating multiple independent IO actions + * and that need to be signaled together. + */ +class CompositeException(val errors: List[Throwable]) + extends RuntimeException() with Serializable { + + def this(args: Throwable*) = this(args.toList) + + override def toString: String = { + getClass.getName + ( + if (errors.isEmpty) "" else { + val (first, last) = errors.splitAt(2) + val str = first.map(e => s"${e.getClass.getName}: ${e.getMessage}").mkString(", ") + val reasons = if (last.nonEmpty) str + "..." else str + "(" + reasons + ")" + }) + } +} diff --git a/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala b/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala index 049b292573..a89b34ee5f 100644 --- a/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala +++ b/laws/jvm/src/test/scala/cats/effect/IOJVMTests.scala @@ -17,7 +17,7 @@ package cats.effect import org.scalatest._ - +import cats.syntax.all._ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ @@ -85,6 +85,18 @@ class IOJVMTests extends FunSuite with Matchers { assert(elapsed >= 100) } + test("parMap2 concurrently") { + import scala.concurrent.ExecutionContext.Implicits.global + + val io1 = IO.shift *> IO(1) + val io2 = IO.shift *> IO(2) + + for (_ <- 0 until 1000) { + val r = (io1, io2).parMapN(_ + _).unsafeRunSync + r shouldEqual 3 + } + } + // this is expected behavior // ...also it appears to fell the mighty Travis, so it's disabled for now /*test("fail to provide stack safety with repeated async suspensions") { diff --git a/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala b/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala index b1dd064bc5..0b07b6c668 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala @@ -29,6 +29,9 @@ object arbitrary { implicit def catsEffectLawsArbitraryForIO[A: Arbitrary: Cogen]: Arbitrary[IO[A]] = Arbitrary(Gen.delay(genIO[A])) + implicit def catsEffectLawsArbitraryForIOParallel[A: Arbitrary: Cogen]: Arbitrary[IO.Par[A]] = + Arbitrary(catsEffectLawsArbitraryForIO[A].arbitrary.map(_.toPar)) + def genIO[A: Arbitrary: Cogen]: Gen[IO[A]] = { Gen.frequency( 5 -> genPure[A], diff --git a/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala b/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala index a37dedf088..4a22fc63ab 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala @@ -39,6 +39,16 @@ trait TestInstances { eqFuture[A].eqv(x.unsafeToFuture(), y.unsafeToFuture()) } + /** + * Defines equality for `IO.Par` references that can + * get interpreted by means of a [[TestContext]]. + */ + implicit def eqIOPar[A](implicit A: Eq[A], ec: TestContext): Eq[IO.Par[A]] = + new Eq[IO.Par[A]] { + def eqv(x: IO.Par[A], y: IO.Par[A]): Boolean = + eqFuture[A].eqv(x.toIO.unsafeToFuture(), y.toIO.unsafeToFuture()) + } + /** * Defines equality for `Future` references that can * get interpreted by means of a [[TestContext]]. diff --git a/laws/shared/src/test/scala/cats/effect/IOAsyncTests.scala b/laws/shared/src/test/scala/cats/effect/IOAsyncTests.scala index f768647a03..cdd8eb4458 100644 --- a/laws/shared/src/test/scala/cats/effect/IOAsyncTests.scala +++ b/laws/shared/src/test/scala/cats/effect/IOAsyncTests.scala @@ -18,7 +18,6 @@ package cats.effect import org.scalactic.source.Position import org.scalatest.{Assertion, AsyncFunSuite, Matchers} - import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success, Try} diff --git a/laws/shared/src/test/scala/cats/effect/IOTests.scala b/laws/shared/src/test/scala/cats/effect/IOTests.scala index 7140fd57ef..d05c31f2d0 100644 --- a/laws/shared/src/test/scala/cats/effect/IOTests.scala +++ b/laws/shared/src/test/scala/cats/effect/IOTests.scala @@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger import cats.effect.internals.IOPlatform import cats.effect.laws.discipline.EffectTests import cats.effect.laws.discipline.arbitrary._ +import cats.effect.laws.util.TestContext +import cats.effect.util.CompositeException import cats.implicits._ import cats.kernel.laws.discipline.MonoidTests import cats.laws._ @@ -38,6 +40,9 @@ class IOTests extends BaseTestsSuite { checkAllAsync("IO", implicit ec => MonoidTests[IO[Int]].monoid) checkAllAsync("IO", implicit ec => SemigroupKTests[IO].semigroupK[Int]) + checkAllAsync("IO.Par", implicit ec => ApplicativeTests[IO.Par].applicative[Int, Int, Int]) + checkAllAsync("IO", implicit ec => ParallelTests[IO, IO.Par].parallel[Int, Int]) + test("defer evaluation until run") { var run = false val ioa = IO { run = true } @@ -412,6 +417,62 @@ class IOTests extends BaseTestsSuite { io.unsafeRunSync() shouldEqual max * 10000 } + + test("parMap2 for successful values") { + implicit val ec = TestContext() + val io1 = IO.shift *> IO.pure(1) + val io2 = IO.shift *> IO.pure(2) + + val io3 = (io1, io2).parMapN(_ + _) + val f = io3.unsafeToFuture() + ec.tick() + f.value shouldEqual Some(Success(3)) + } + + test("parMap2 can fail for one") { + implicit val ec = TestContext() + val dummy = new RuntimeException("dummy") + val io1 = IO.shift *> IO.pure(1) + val io2 = IO.shift *> IO.raiseError[Int](dummy) + + val io3 = (io1, io2).parMapN(_ + _) + val f1 = io3.unsafeToFuture() + + ec.tick() + f1.value shouldEqual Some(Failure(dummy)) + + val io4 = (io2, io1).parMapN(_ + _) + val f2 = io4.unsafeToFuture() + + ec.tick() + f2.value shouldEqual Some(Failure(dummy)) + } + + test("parMap2 can fail for both") { + implicit val ec = TestContext() + val dummy1 = new RuntimeException("dummy1") + val dummy2 = new RuntimeException("dummy2") + + val io1 = IO.shift *> IO.raiseError[Int](dummy1) + val io2 = IO.shift *> IO.raiseError[Int](dummy2) + + val io3 = (io1, io2).parMapN(_ + _) + val f1 = io3.unsafeToFuture() + ec.tick() + + val errors = f1.value.collect { + case Failure(ref: CompositeException) => ref.errors + } + + errors shouldBe Some(List(dummy1, dummy2)) + } + + test("parMap2 is stack safe") { + val count = if (IOPlatform.isJVM) 100000 else 5000 + val io = (0 until count).foldLeft(IO(0))((acc, e) => (acc, IO(e)).parMapN(_ + _)) + + io.unsafeRunSync() shouldEqual (count * (count - 1) / 2) + } } object IOTests { From 1a445338106fdad54e8dea119827075738eeaf1a Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 16 Jan 2018 14:49:29 +0200 Subject: [PATCH 05/14] Fix comment --- core/shared/src/main/scala/cats/effect/internals/IOParMap.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala index 1abd7a63ee..b2d863749b 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala @@ -74,7 +74,7 @@ private[effect] object IOParMap { fb.unsafeRunAsync { attemptB => // Using Java 8 platform intrinsics state.getAndSet(Right(attemptB)) match { - case null => () // wait for B + case null => () // wait for A case Left(attemptA) => complete(attemptA, attemptB) case right => // $COVERAGE-OFF$ From 2362fcb098f2a2680c81c1773851e376963fb503 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 16 Jan 2018 14:51:11 +0200 Subject: [PATCH 06/14] Fix comment --- core/shared/src/main/scala/cats/effect/IO.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 6cf4f2f420..d9d6e67813 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -293,7 +293,7 @@ private[effect] abstract class IOParallelNewtype { * Helpers are provided as extension methods for converting back and forth * in [[IO.IOExtensions.toPar .toPar]] and [[IO.ParExtensions.toIO .toIO]]. * - * The encoding is inspired based on the "newtypes" project by + * The encoding is based on the "newtypes" project by * Alexander Konovalov, chosen because it's devoid of boxing issues and * a good choice until opaque types will land in Scala. */ From aa38bd21940d85a8ce3bced32a31b1e5b4c39819 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 16 Jan 2018 15:06:03 +0200 Subject: [PATCH 07/14] Fix test for JS --- laws/shared/src/test/scala/cats/effect/IOTests.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/laws/shared/src/test/scala/cats/effect/IOTests.scala b/laws/shared/src/test/scala/cats/effect/IOTests.scala index d05c31f2d0..ea73dbd580 100644 --- a/laws/shared/src/test/scala/cats/effect/IOTests.scala +++ b/laws/shared/src/test/scala/cats/effect/IOTests.scala @@ -471,7 +471,8 @@ class IOTests extends BaseTestsSuite { val count = if (IOPlatform.isJVM) 100000 else 5000 val io = (0 until count).foldLeft(IO(0))((acc, e) => (acc, IO(e)).parMapN(_ + _)) - io.unsafeRunSync() shouldEqual (count * (count - 1) / 2) + val f = io.unsafeToFuture() + f.value shouldEqual Some(Success(count * (count - 1) / 2)) } } From 29a7e2307c71b51fc5f22121edba7119210e5bc5 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 16 Jan 2018 15:20:57 +0200 Subject: [PATCH 08/14] Fix ScalaDoc --- core/shared/src/main/scala/cats/effect/IO.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index d9d6e67813..98c78f47f9 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -286,9 +286,9 @@ sealed abstract class IO[+A] { } private[effect] abstract class IOParallelNewtype { - /** Newtype encoding for an `IO` datatype that has a [[cats.Applicative]] - * capable of doing parallel processing in `ap` and `map2`, needed - * for implementing [[cats.Parallel]]. + /** Newtype encoding for an `IO` datatype that has a `cats.Applicative` + * capable of doing parallel processing in ap` and `map2`, needed + * for implementing `cats.Parallel`. * * Helpers are provided as extension methods for converting back and forth * in [[IO.IOExtensions.toPar .toPar]] and [[IO.ParExtensions.toIO .toIO]]. From 368cca30bf82fbd427042b5795dd37e1220f30d2 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Wed, 17 Jan 2018 10:22:48 +0200 Subject: [PATCH 09/14] Fix newtype for Scala 2.10, modify CompositeException --- .../src/main/scala/cats/effect/IO.scala | 6 +-- .../cats/effect/internals/IOParMap.scala | 2 +- .../cats/effect/internals/Newtype1.scala | 34 +++++++++++++++++ .../cats/effect/util/CompositeException.scala | 37 ++++++++++++------- .../effect/laws/discipline/Arbitrary.scala | 2 +- .../cats/effect/laws/util/TestInstances.scala | 16 +++++--- .../src/test/scala/cats/effect/IOTests.scala | 6 ++- 7 files changed, 76 insertions(+), 27 deletions(-) create mode 100644 core/shared/src/main/scala/cats/effect/internals/Newtype1.scala diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 98c78f47f9..4778eaccdc 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -302,11 +302,7 @@ private[effect] abstract class IOParallelNewtype { /** Newtype encoding, see the [[IO.Par]] type alias * for more details. */ - object Par { - type Base - trait Tag extends Any - type Type[+A] <: Base with Tag - } + object Par extends Newtype1 } private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype { diff --git a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala index b2d863749b..9730c28e79 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala @@ -53,7 +53,7 @@ private[effect] object IOParMap { cb(left.asInstanceOf[Left[Throwable, C]]) case Left(e2) => // Signaling both errors - cb(Left(new CompositeException(e1, e2))) + cb(Left(CompositeException(e1, e2))) } } }) diff --git a/core/shared/src/main/scala/cats/effect/internals/Newtype1.scala b/core/shared/src/main/scala/cats/effect/internals/Newtype1.scala new file mode 100644 index 0000000000..0f719c349d --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/internals/Newtype1.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2017 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +/** INTERNAL API — Newtype encoding, used for defining `IO.Par`. + * + * The `Newtype1` abstract class indirection is needed for Scala 2.10, + * otherwise we could just define these types straight on the + * `IO.Par` companion object. In Scala 2.10 definining these types + * straight on the companion object yields an error like: + * ''"only classes can have declared but undefined members"''. + * + * Inspired by + * [[https://github.com/alexknvl/newtypes alexknvl/newtypes]]. + */ +private[effect] abstract class Newtype1 { + type Base + trait Tag extends Any + type Type[+A] <: Base with Tag +} diff --git a/core/shared/src/main/scala/cats/effect/util/CompositeException.scala b/core/shared/src/main/scala/cats/effect/util/CompositeException.scala index 1e60723309..06325b18ff 100644 --- a/core/shared/src/main/scala/cats/effect/util/CompositeException.scala +++ b/core/shared/src/main/scala/cats/effect/util/CompositeException.scala @@ -16,22 +16,31 @@ package cats.effect.util +import cats.data.NonEmptyList + /** A composite exception represents a list of exceptions - * caught from evaluating multiple independent IO actions + * caught from evaluating multiple independent actions * and that need to be signaled together. + * + * Note the constructor doesn't allow wrapping anything less + * than two throwable references. + * + * Use [[cats.effect.util.CompositeException.apply apply]] + * for building composite exceptions. */ -class CompositeException(val errors: List[Throwable]) - extends RuntimeException() with Serializable { - - def this(args: Throwable*) = this(args.toList) +final class CompositeException(val head: Throwable, val tail: NonEmptyList[Throwable]) + extends RuntimeException( + s"Multiple exceptions were thrown (${1 + tail.size}), " + + s"first ${head.getClass.getName}: ${head.getMessage}") + with Serializable { - override def toString: String = { - getClass.getName + ( - if (errors.isEmpty) "" else { - val (first, last) = errors.splitAt(2) - val str = first.map(e => s"${e.getClass.getName}: ${e.getMessage}").mkString(", ") - val reasons = if (last.nonEmpty) str + "..." else str - "(" + reasons + ")" - }) - } + /** Returns the set of all errors wrapped by this composite. */ + def all: NonEmptyList[Throwable] = + head :: tail } + +object CompositeException { + /** Simple builder for [[CompositeException]]. */ + def apply(first: Throwable, second: Throwable, rest: List[Throwable] = Nil): CompositeException = + new CompositeException(first, NonEmptyList(second, rest)) +} \ No newline at end of file diff --git a/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala b/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala index 0b07b6c668..18147e6ff0 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala @@ -91,5 +91,5 @@ object arbitrary { } yield ioa.map(f1).map(f2) implicit def catsEffectLawsCogenForIO[A](implicit cgfa: Cogen[Future[A]]): Cogen[IO[A]] = - cgfa.contramap((ioa: IO[A]) => ioa.unsafeToFuture) + cgfa.contramap((ioa: IO[A]) => ioa.unsafeToFuture()) } diff --git a/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala b/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala index 4a22fc63ab..441fce00f8 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala @@ -17,7 +17,10 @@ package cats.effect.laws.util import cats.effect.IO +import cats.effect.util.CompositeException import cats.kernel.Eq + +import scala.annotation.tailrec import scala.concurrent.{ExecutionException, Future} import scala.util.{Failure, Success} @@ -79,12 +82,15 @@ trait TestInstances { // Unwraps exceptions that got caught by Future's implementation // and that got wrapped in ExecutionException (`Future(throw ex)`) - def extractEx(ex: Throwable): String = { - var ref = ex - while (ref.isInstanceOf[ExecutionException] && ref.getCause != null) { - ref = ref.getCause + @tailrec def extractEx(ex: Throwable): String = { + ex match { + case e: ExecutionException if e.getCause != null => + extractEx(e.getCause) + case e: CompositeException => + extractEx(e.head) + case _ => + s"${ex.getClass.getName}: ${ex.getMessage}" } - s"${ref.getClass.getName}: ${ref.getMessage}" } } } diff --git a/laws/shared/src/test/scala/cats/effect/IOTests.scala b/laws/shared/src/test/scala/cats/effect/IOTests.scala index ea73dbd580..7c31b882d5 100644 --- a/laws/shared/src/test/scala/cats/effect/IOTests.scala +++ b/laws/shared/src/test/scala/cats/effect/IOTests.scala @@ -43,6 +43,10 @@ class IOTests extends BaseTestsSuite { checkAllAsync("IO.Par", implicit ec => ApplicativeTests[IO.Par].applicative[Int, Int, Int]) checkAllAsync("IO", implicit ec => ParallelTests[IO, IO.Par].parallel[Int, Int]) + test("IO.Par's applicative instance is different") { + implicitly[Applicative[IO]] shouldNot be(implicitly[Applicative[IO.Par]]) + } + test("defer evaluation until run") { var run = false val ioa = IO { run = true } @@ -461,7 +465,7 @@ class IOTests extends BaseTestsSuite { ec.tick() val errors = f1.value.collect { - case Failure(ref: CompositeException) => ref.errors + case Failure(ref: CompositeException) => ref.all.toList } errors shouldBe Some(List(dummy1, dummy2)) From a91c9d8888ae9848f8696508618bed3fb2a8103f Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Wed, 17 Jan 2018 10:33:09 +0200 Subject: [PATCH 10/14] Fix scaladoc --- core/shared/src/main/scala/cats/effect/IO.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 4778eaccdc..7abcd06505 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -287,7 +287,7 @@ sealed abstract class IO[+A] { private[effect] abstract class IOParallelNewtype { /** Newtype encoding for an `IO` datatype that has a `cats.Applicative` - * capable of doing parallel processing in ap` and `map2`, needed + * capable of doing parallel processing in `ap` and `map2`, needed * for implementing `cats.Parallel`. * * Helpers are provided as extension methods for converting back and forth From 3f510f19f7e79f63a082a96e6176548c82eca94f Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Wed, 17 Jan 2018 10:49:33 +0200 Subject: [PATCH 11/14] Make FunctionK vals in Parallel[IO] --- core/shared/src/main/scala/cats/effect/IO.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 7abcd06505..11de28b541 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -374,9 +374,9 @@ private[effect] abstract class IOInstances extends IOLowPriorityInstances { parApplicative override def monad: Monad[IO] = ioEffect - override def sequential: ~>[IO.Par, IO] = + override val sequential: ~>[IO.Par, IO] = new FunctionK[IO.Par, IO] { def apply[A](fa: IO.Par[A]): IO[A] = fa.toIO } - override def parallel: ~>[IO, IO.Par] = + override val parallel: ~>[IO, IO.Par] = new FunctionK[IO, IO.Par] { def apply[A](fa: IO[A]): IO.Par[A] = fa.toPar } } From 73648aa73ec0d1edda6ac9dfcceade6c8866d4fa Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Wed, 17 Jan 2018 12:22:36 +0200 Subject: [PATCH 12/14] Got rid of extension methods --- .../src/main/scala/cats/effect/IO.scala | 31 +++++++------------ .../{Newtype1.scala => IONewtype.scala} | 18 ++++++++--- .../effect/laws/discipline/Arbitrary.scala | 4 +-- .../cats/effect/laws/util/TestInstances.scala | 3 +- 4 files changed, 29 insertions(+), 27 deletions(-) rename core/shared/src/main/scala/cats/effect/internals/{Newtype1.scala => IONewtype.scala} (68%) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 11de28b541..ae6809729b 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -290,8 +290,8 @@ private[effect] abstract class IOParallelNewtype { * capable of doing parallel processing in `ap` and `map2`, needed * for implementing `cats.Parallel`. * - * Helpers are provided as extension methods for converting back and forth - * in [[IO.IOExtensions.toPar .toPar]] and [[IO.ParExtensions.toIO .toIO]]. + * Helpers are provided for converting back and forth in [[Par.apply]] + * for wrapping any `IO` value and [[Par.unwrap]] for unwrapping. * * The encoding is based on the "newtypes" project by * Alexander Konovalov, chosen because it's devoid of boxing issues and @@ -302,7 +302,7 @@ private[effect] abstract class IOParallelNewtype { /** Newtype encoding, see the [[IO.Par]] type alias * for more details. */ - object Par extends Newtype1 + object Par extends IONewtype } private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype { @@ -317,18 +317,21 @@ private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype private[effect] abstract class IOInstances extends IOLowPriorityInstances { implicit val parApplicative: Applicative[IO.Par] = new Applicative[IO.Par] { + import IO.Par.unwrap + import IO.Par.{apply => par} + override def pure[A](x: A): IO.Par[A] = - IO.pure(x).toPar + par(IO.pure(x)) override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] = - IOParMap(fa.toIO, fb.toIO)(f).toPar + par(IOParMap(unwrap(fa), unwrap(fb))(f)) override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] = map2(ff, fa)(_(_)) override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] = map2(fa, fb)((_, _)) override def map[A, B](fa: IO.Par[A])(f: A => B): IO.Par[B] = - fa.toIO.map(f).toPar + par(unwrap(fa).map(f)) override def unit: IO.Par[Unit] = - IO.unit.toPar + par(IO.unit) } implicit val ioEffect: Effect[IO] = new Effect[IO] { @@ -375,9 +378,9 @@ private[effect] abstract class IOInstances extends IOLowPriorityInstances { override def monad: Monad[IO] = ioEffect override val sequential: ~>[IO.Par, IO] = - new FunctionK[IO.Par, IO] { def apply[A](fa: IO.Par[A]): IO[A] = fa.toIO } + new FunctionK[IO.Par, IO] { def apply[A](fa: IO.Par[A]): IO[A] = IO.Par.unwrap(fa) } override val parallel: ~>[IO, IO.Par] = - new FunctionK[IO, IO.Par] { def apply[A](fa: IO[A]): IO.Par[A] = fa.toPar } + new FunctionK[IO, IO.Par] { def apply[A](fa: IO[A]): IO.Par[A] = IO.Par(fa) } } implicit def ioMonoid[A: Monoid]: Monoid[IO[A]] = new IOSemigroup[A] with Monoid[IO[A]] { @@ -658,14 +661,4 @@ object IO extends IOInstances { override def recover(e: Throwable) = Pure(Left(e)) } - - implicit final class IOExtensions[+A](val self: IO[A]) extends AnyVal { - /** Wraps any `IO` value into an [[IO.Par]] type. */ - def toPar: IO.Par[A] = self.asInstanceOf[Par[A]] - } - - implicit final class ParExtensions[+A](val self: Par[A]) extends AnyVal { - /** Unwraps `IO.Par` values back to the base [[IO]] type. */ - def toIO: IO[A] = self.asInstanceOf[IO[A]] - } } diff --git a/core/shared/src/main/scala/cats/effect/internals/Newtype1.scala b/core/shared/src/main/scala/cats/effect/internals/IONewtype.scala similarity index 68% rename from core/shared/src/main/scala/cats/effect/internals/Newtype1.scala rename to core/shared/src/main/scala/cats/effect/internals/IONewtype.scala index 0f719c349d..745f037905 100644 --- a/core/shared/src/main/scala/cats/effect/internals/Newtype1.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IONewtype.scala @@ -16,19 +16,27 @@ package cats.effect.internals +import cats.effect.IO + /** INTERNAL API — Newtype encoding, used for defining `IO.Par`. * - * The `Newtype1` abstract class indirection is needed for Scala 2.10, + * The `IONewtype` abstract class indirection is needed for Scala 2.10, * otherwise we could just define these types straight on the - * `IO.Par` companion object. In Scala 2.10 definining these types - * straight on the companion object yields an error like: + * `IO.Par` companion object. In Scala 2.10 defining these types + * straight on the companion object yields an error like * ''"only classes can have declared but undefined members"''. * * Inspired by * [[https://github.com/alexknvl/newtypes alexknvl/newtypes]]. */ -private[effect] abstract class Newtype1 { +private[effect] abstract class IONewtype { self => type Base trait Tag extends Any - type Type[+A] <: Base with Tag + type Type[+A] <: Base with Tag + + def apply[A](fa: IO[A]): Type[A] = + fa.asInstanceOf[Type[A]] + + def unwrap[A](fa: Type[A]): IO[A] = + fa.asInstanceOf[IO[A]] } diff --git a/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala b/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala index 18147e6ff0..779330f0d0 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/discipline/Arbitrary.scala @@ -19,9 +19,9 @@ package effect package laws package discipline +import cats.effect.IO.Par import org.scalacheck._ import org.scalacheck.Arbitrary.{arbitrary => getArbitrary} - import scala.concurrent.Future import scala.util.Either @@ -30,7 +30,7 @@ object arbitrary { Arbitrary(Gen.delay(genIO[A])) implicit def catsEffectLawsArbitraryForIOParallel[A: Arbitrary: Cogen]: Arbitrary[IO.Par[A]] = - Arbitrary(catsEffectLawsArbitraryForIO[A].arbitrary.map(_.toPar)) + Arbitrary(catsEffectLawsArbitraryForIO[A].arbitrary.map(Par.apply)) def genIO[A: Arbitrary: Cogen]: Gen[IO[A]] = { Gen.frequency( diff --git a/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala b/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala index 441fce00f8..d3f2f41875 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/util/TestInstances.scala @@ -48,8 +48,9 @@ trait TestInstances { */ implicit def eqIOPar[A](implicit A: Eq[A], ec: TestContext): Eq[IO.Par[A]] = new Eq[IO.Par[A]] { + import IO.Par.unwrap def eqv(x: IO.Par[A], y: IO.Par[A]): Boolean = - eqFuture[A].eqv(x.toIO.unsafeToFuture(), y.toIO.unsafeToFuture()) + eqFuture[A].eqv(unwrap(x).unsafeToFuture(), unwrap(y).unsafeToFuture()) } /** From fd1352fe839fe6778c10bf2a96745bc0339fa27e Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Wed, 17 Jan 2018 17:52:48 +0200 Subject: [PATCH 13/14] Fix Scaladoc --- core/shared/src/main/scala/cats/effect/IO.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index ae6809729b..617a83cf49 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -290,8 +290,8 @@ private[effect] abstract class IOParallelNewtype { * capable of doing parallel processing in `ap` and `map2`, needed * for implementing `cats.Parallel`. * - * Helpers are provided for converting back and forth in [[Par.apply]] - * for wrapping any `IO` value and [[Par.unwrap]] for unwrapping. + * Helpers are provided for converting back and forth in [[IO.Par.apply]] + * for wrapping any IO` value and [[IO.Par.unwrap]] for unwrapping. * * The encoding is based on the "newtypes" project by * Alexander Konovalov, chosen because it's devoid of boxing issues and From 122405f51b33642b9afbd688cea417654558b29c Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Wed, 17 Jan 2018 17:56:06 +0200 Subject: [PATCH 14/14] Fix Scaladoc --- core/shared/src/main/scala/cats/effect/IO.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 617a83cf49..2fe26589b7 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -290,8 +290,8 @@ private[effect] abstract class IOParallelNewtype { * capable of doing parallel processing in `ap` and `map2`, needed * for implementing `cats.Parallel`. * - * Helpers are provided for converting back and forth in [[IO.Par.apply]] - * for wrapping any IO` value and [[IO.Par.unwrap]] for unwrapping. + * Helpers are provided for converting back and forth in `Par.apply` + * for wrapping any `IO` value and `Par.unwrap` for unwrapping. * * The encoding is based on the "newtypes" project by * Alexander Konovalov, chosen because it's devoid of boxing issues and