Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel[IO, IO.Par] implementation #115

Merged
merged 14 commits into from
Jan 17, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,9 @@ private[effect] object IOPlatform {
* The default for JavaScript is 32, from which we substract 1
* as an optimization.
*/
private[effect] final val fusionMaxStackDepth = 31
final val fusionMaxStackDepth = 31

/** Returns `true` if the underlying platform is the JVM,
* `false` if it's JavaScript. */
final val isJVM = false
}
102 changes: 102 additions & 0 deletions core/js/src/main/scala/cats/effect/internals/TrampolineEC.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2017 Typelevel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2018

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.internals

import scala.annotation.tailrec
import scala.concurrent.ExecutionContext

/** INTERNAL API — a [[scala.concurrent.ExecutionContext]] implementation
* that executes runnables immediately, on the current thread,
* by means of a trampoline implementation.
*
* Can be used in some cases to keep the asynchronous execution
* on the current thread, as an optimization, but be warned,
* you have to know what you're doing.
*
* This is the JavaScript-specific implementation, for which we
* don't need a `ThreadLocal` or Scala's `BlockingContext`.
*/
private[effect] final class TrampolineEC private (underlying: ExecutionContext)
extends ExecutionContext {

// Starts with `null`!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment why vs justing using Nil? I guess you are using null to signal that you are not in a localRunLoop. Is that right? Can you comment?

Could it be NonEmptyList[Runnable] I wonder, and then use the null trick on that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the difference between null and Nil, null signaling that we aren't in a run-loop and Nil signaling that a Runnable is in progress. This optimizes for the shallow case, as the first Runnable being executed doesn't get stored and retrieved from that List.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this isn't so obvious, guess at least a code comment is needed.

private[this] var localTasks: List[Runnable] = _

override def execute(runnable: Runnable): Unit =
localTasks match {
case null =>
// If we aren't in local mode yet, start local loop
localTasks = Nil
localRunLoop(runnable)
case some =>
// If we are already in batching mode, add to stack
localTasks = runnable :: some
}


@tailrec private def localRunLoop(head: Runnable): Unit = {
try { head.run() } catch {
case NonFatal(ex) =>
// Sending everything else to the underlying context
forkTheRest(null)
reportFailure(ex)
}

localTasks match {
case null => ()
case Nil =>
localTasks = null
case h2 :: t2 =>
localTasks = t2
localRunLoop(h2)
}
}

private def forkTheRest(newLocalTasks: Nil.type): Unit = {
val rest = localTasks
localTasks = newLocalTasks

rest match {
case null | Nil => ()
case head :: tail =>
underlying.execute(new ResumeRun(head, tail))
}
}

override def reportFailure(t: Throwable): Unit =
underlying.reportFailure(t)

private final class ResumeRun(head: Runnable, rest: List[Runnable])
extends Runnable {

def run(): Unit = {
localTasks = rest
localRunLoop(head)
}
}
}

private[effect] object TrampolineEC {
/** [[TrampolineEC]] instance that executes everything
* immediately, on the current call stack.
*/
val immediate: TrampolineEC =
new TrampolineEC(new ExecutionContext {
def execute(r: Runnable): Unit = r.run()
def reportFailure(e: Throwable): Unit = throw e
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,15 @@ private[effect] object IOPlatform {
* ...
* </pre>
*/
private[effect] final val fusionMaxStackDepth =
final val fusionMaxStackDepth =
Option(System.getProperty("cats.effect.fusionMaxStackDepth", ""))
.filter(s => s != null && s.nonEmpty)
.flatMap(s => Try(s.toInt).toOption)
.filter(_ > 0)
.map(_ - 1)
.getOrElse(127)

/** Returns `true` if the underlying platform is the JVM,
* `false` if it's JavaScript. */
final val isJVM = true
}
122 changes: 122 additions & 0 deletions core/jvm/src/main/scala/cats/effect/internals/TrampolineEC.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2017 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.internals

import scala.annotation.tailrec
import scala.concurrent.{BlockContext, CanAwait, ExecutionContext}

/** INTERNAL API — a [[scala.concurrent.ExecutionContext]] implementation
* that executes runnables immediately, on the current thread,
* by means of a trampoline implementation.
*
* Can be used in some cases to keep the asynchronous execution
* on the current thread, as an optimization, but be warned,
* you have to know what you're doing.
*
* This is the JVM-specific implementation, for which we
* need a `ThreadLocal` and Scala's `BlockingContext`.
*/
private[effect] final class TrampolineEC private (underlying: ExecutionContext)
extends ExecutionContext {

private[this] val localTasks = new ThreadLocal[List[Runnable]]()

private[this] val trampolineContext: BlockContext =
new BlockContext {
def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
// In case of blocking, execute all scheduled local tasks on
// a separate thread, otherwise we could end up with a dead-lock
forkTheRest(Nil)
thunk
}
}

override def execute(runnable: Runnable): Unit =
localTasks.get match {
case null =>
// If we aren't in local mode yet, start local loop
localTasks.set(Nil)
BlockContext.withBlockContext(trampolineContext) {
localRunLoop(runnable)
}
case some =>
// If we are already in batching mode, add to stack
localTasks.set(runnable :: some)
}

@tailrec private def localRunLoop(head: Runnable): Unit = {
try { head.run() } catch {
case NonFatal(ex) =>
// Sending everything else to the underlying context
forkTheRest(null)
reportFailure(ex)
}

localTasks.get() match {
case null => ()
case Nil =>
localTasks.set(null)
case h2 :: t2 =>
localTasks.set(t2)
localRunLoop(h2)
}
}

private def forkTheRest(newLocalTasks: Nil.type): Unit = {
val rest = localTasks.get()
localTasks.set(newLocalTasks)

rest match {
case null | Nil => ()
case head :: tail =>
underlying.execute(new ResumeRun(head, tail))
}
}

override def reportFailure(t: Throwable): Unit =
underlying.reportFailure(t)

private final class ResumeRun(head: Runnable, rest: List[Runnable])
extends Runnable {

def run(): Unit = {
localTasks.set(rest)
localRunLoop(head)
}
}
}

private[effect] object TrampolineEC {
/** [[TrampolineEC]] instance that executes everything
* immediately, on the current thread.
*
* Implementation notes:
*
* - if too many `blocking` operations are chained, at some point
* the implementation will trigger a stack overflow error
* - `reportError` re-throws the exception in the hope that it
* will get caught and reported by the underlying thread-pool,
* because there's nowhere it could report that error safely
* (i.e. `System.err` might be routed to `/dev/null` and we'd
* have no way to override it)
*/
val immediate: TrampolineEC =
new TrampolineEC(new ExecutionContext {
def execute(r: Runnable): Unit = r.run()
def reportFailure(e: Throwable): Unit = throw e
})
}
55 changes: 53 additions & 2 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package cats
package effect

import cats.arrow.FunctionK
import cats.effect.internals.IOFrame.ErrorHandler
import cats.effect.internals.{IOFrame, IOPlatform, IORunLoop, NonFatal}
import cats.effect.internals._
import cats.effect.internals.IOPlatform.fusionMaxStackDepth

import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration._
Expand Down Expand Up @@ -283,8 +285,27 @@ sealed abstract class IO[+A] {
}
}

private[effect] abstract class IOLowPriorityInstances {
private[effect] abstract class IOParallelNewtype {
/** Newtype encoding for an `IO` datatype that has a `cats.Applicative`
* capable of doing parallel processing in `ap` and `map2`, needed
* for implementing `cats.Parallel`.
*
* Helpers are provided for converting back and forth in `Par.apply`
* for wrapping any `IO` value and `Par.unwrap` for unwrapping.
*
* The encoding is based on the "newtypes" project by
* Alexander Konovalov, chosen because it's devoid of boxing issues and
* a good choice until opaque types will land in Scala.
*/
type Par[+A] = Par.Type[A]

/** Newtype encoding, see the [[IO.Par]] type alias
* for more details.
*/
object Par extends IONewtype
}

private[effect] abstract class IOLowPriorityInstances extends IOParallelNewtype {
private[effect] class IOSemigroup[A: Semigroup] extends Semigroup[IO[A]] {
def combine(ioa1: IO[A], ioa2: IO[A]) =
ioa1.flatMap(a1 => ioa2.map(a2 => Semigroup[A].combine(a1, a2)))
Expand All @@ -295,6 +316,24 @@ private[effect] abstract class IOLowPriorityInstances {

private[effect] abstract class IOInstances extends IOLowPriorityInstances {

implicit val parApplicative: Applicative[IO.Par] = new Applicative[IO.Par] {
import IO.Par.unwrap
import IO.Par.{apply => par}

override def pure[A](x: A): IO.Par[A] =
par(IO.pure(x))
override def map2[A, B, Z](fa: IO.Par[A], fb: IO.Par[B])(f: (A, B) => Z): IO.Par[Z] =
par(IOParMap(unwrap(fa), unwrap(fb))(f))
override def ap[A, B](ff: IO.Par[A => B])(fa: IO.Par[A]): IO.Par[B] =
map2(ff, fa)(_(_))
override def product[A, B](fa: IO.Par[A], fb: IO.Par[B]): IO.Par[(A, B)] =
map2(fa, fb)((_, _))
override def map[A, B](fa: IO.Par[A])(f: A => B): IO.Par[B] =
par(unwrap(fa).map(f))
override def unit: IO.Par[Unit] =
par(IO.unit)
}

implicit val ioEffect: Effect[IO] = new Effect[IO] {
override def pure[A](a: A): IO[A] =
IO.pure(a)
Expand Down Expand Up @@ -332,6 +371,18 @@ private[effect] abstract class IOInstances extends IOLowPriorityInstances {
}
}

implicit val ioParallel: Parallel[IO, IO.Par] =
new Parallel[IO, IO.Par] {
override def applicative: Applicative[IO.Par] =
parApplicative
override def monad: Monad[IO] =
ioEffect
override val sequential: ~>[IO.Par, IO] =
new FunctionK[IO.Par, IO] { def apply[A](fa: IO.Par[A]): IO[A] = IO.Par.unwrap(fa) }
override val parallel: ~>[IO, IO.Par] =
new FunctionK[IO, IO.Par] { def apply[A](fa: IO[A]): IO.Par[A] = IO.Par(fa) }
}

implicit def ioMonoid[A: Monoid]: Monoid[IO[A]] = new IOSemigroup[A] with Monoid[IO[A]] {
def empty = IO.pure(Monoid[A].empty)
}
Expand Down
42 changes: 42 additions & 0 deletions core/shared/src/main/scala/cats/effect/internals/IONewtype.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2017 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.internals

import cats.effect.IO

/** INTERNAL API — Newtype encoding, used for defining `IO.Par`.
*
* The `IONewtype` abstract class indirection is needed for Scala 2.10,
* otherwise we could just define these types straight on the
* `IO.Par` companion object. In Scala 2.10 defining these types
* straight on the companion object yields an error like
* ''"only classes can have declared but undefined members"''.
*
* Inspired by
* [[https://github.com/alexknvl/newtypes alexknvl/newtypes]].
*/
private[effect] abstract class IONewtype { self =>
type Base
trait Tag extends Any
type Type[+A] <: Base with Tag

def apply[A](fa: IO[A]): Type[A] =
fa.asInstanceOf[Type[A]]

def unwrap[A](fa: Type[A]): IO[A] =
fa.asInstanceOf[IO[A]]
}
Loading