diff --git a/README.md b/README.md index fcc9aa8d..54c0dec3 100644 --- a/README.md +++ b/README.md @@ -38,10 +38,8 @@ To tell `Fetch` how to get the data you want, you must implement the `DataSource Data Sources take two type parameters: -
    -
  1. Identity is a type that has enough information to fetch the data. For a users data source, this would be a user's unique ID.
  2. -
  3. Result is the type of data we want to fetch. For a users data source, this would the `User` type.
  4. -
+1. `Identity` is a type that has enough information to fetch the data. For a users data source, this would be a user's unique ID. +2. `Result` is the type of data we want to fetch. For a users data source, this would the `User` type. ```scala import cats.data.NonEmptyList @@ -56,7 +54,7 @@ We'll implement a dummy data source that can convert integers to strings. For co ```scala import cats.data.NonEmptyList -import cats.std.list._ +import cats.instances.list._ import fetch._ implicit object ToStringSource extends DataSource[Int, String]{ @@ -69,7 +67,7 @@ implicit object ToStringSource extends DataSource[Int, String]{ override def fetchMany(ids: NonEmptyList[Int]): Query[Map[Int, String]] = { Query.sync({ println(s"[${Thread.currentThread.getId}] Many ToString $ids") - ids.unwrap.map(i => (i, i.toString)).toMap + ids.toList.map(i => (i, i.toString)).toMap }) } } @@ -99,7 +97,7 @@ Let's run it and wait for the fetch to complete: ```scala fetchOne.runA[Id] -// [46] One ToString 1 +// [45] One ToString 1 // res3: cats.Id[String] = 1 ``` @@ -117,7 +115,7 @@ When executing the above fetch, note how the three identities get batched and th ```scala fetchThree.runA[Id] -// [46] Many ToString OneAnd(1,List(2, 3)) +// [45] Many ToString NonEmptyList(1, 2, 3) // res5: cats.Id[(String, String, String)] = (1,2,3) ``` @@ -138,7 +136,7 @@ implicit object LengthSource extends DataSource[String, Int]{ override def fetchMany(ids: NonEmptyList[String]): Query[Map[String, Int]] = { Query.async((ok, fail) => { println(s"[${Thread.currentThread.getId}] Many Length $ids") - ok(ids.unwrap.map(i => (i, i.size)).toMap) + ok(ids.toList.map(i => (i, i.size)).toMap) }) } } @@ -156,8 +154,8 @@ Note how the two independent data fetches run in parallel, minimizing the latenc ```scala fetchMulti.runA[Id] -// [46] One ToString 1 -// [47] One Length one +// [45] One ToString 1 +// [46] One Length one // res7: cats.Id[(String, Int)] = (1,3) ``` @@ -176,6 +174,6 @@ While running it, notice that the data source is only queried once. The next tim ```scala fetchTwice.runA[Id] -// [46] One ToString 1 +// [45] One ToString 1 // res8: cats.Id[(String, String)] = (1,1) ``` diff --git a/build.sbt b/build.sbt index 660eed4b..1c1e699c 100644 --- a/build.sbt +++ b/build.sbt @@ -17,8 +17,8 @@ lazy val buildSettings = Seq( lazy val commonSettings = Seq( resolvers += Resolver.sonatypeRepo("releases"), libraryDependencies ++= Seq( - "org.typelevel" %%% "cats" % "0.6.0", - "org.scalatest" %%% "scalatest" % "3.0.0-M7" % "test", + "org.typelevel" %%% "cats" % "0.7.2", + "org.scalatest" %%% "scalatest" % "3.0.0" % "test", compilerPlugin( "org.spire-math" %% "kind-projector" % "0.7.1" ) @@ -129,7 +129,7 @@ lazy val readme = (project in file("tut")) lazy val monixSettings = ( libraryDependencies ++= Seq( - "io.monix" %%% "monix-eval" % "2.0-RC5" + "io.monix" %%% "monix-eval" % "2.0.5" ) ) diff --git a/docs/src/jekyll/_config.yml b/docs/src/jekyll/_config.yml index 4113fcd9..dfaf2c67 100644 --- a/docs/src/jekyll/_config.yml +++ b/docs/src/jekyll/_config.yml @@ -6,6 +6,8 @@ style: fetch highlight_theme: tomorrow docs: true markdown: redcarpet +cdn: + url: https://rawgit.com/47deg/microsites/cdn/ collections: tut: output: true diff --git a/docs/src/tut/docs.md b/docs/src/tut/docs.md index b89f7d86..e21e9f24 100644 --- a/docs/src/tut/docs.md +++ b/docs/src/tut/docs.md @@ -114,7 +114,7 @@ And now we're ready to write our user data source; we'll emulate a database with ```tut:silent import cats.data.NonEmptyList -import cats.std.list._ +import cats.instances.list._ import fetch._ @@ -126,6 +126,8 @@ val userDatabase: Map[UserId, User] = Map( ) implicit object UserSource extends DataSource[UserId, User]{ + override def name = "User data source" + override def fetchOne(id: UserId): Query[Option[User]] = { Query.sync({ latency(userDatabase.get(id), s"One User $id") @@ -133,7 +135,7 @@ implicit object UserSource extends DataSource[UserId, User]{ } override def fetchMany(ids: NonEmptyList[UserId]): Query[Map[UserId, User]] = { Query.sync({ - latency(userDatabase.filterKeys(ids.unwrap.contains), s"Many Users $ids") + latency(userDatabase.filterKeys(ids.toList.contains), s"Many Users $ids") }) } } @@ -336,6 +338,8 @@ val postDatabase: Map[PostId, Post] = Map( ) implicit object PostSource extends DataSource[PostId, Post]{ + override def name = "Post data source" + override def fetchOne(id: PostId): Query[Option[Post]] = { Query.sync({ latency(postDatabase.get(id), s"One Post $id") @@ -343,7 +347,7 @@ implicit object PostSource extends DataSource[PostId, Post]{ } override def fetchMany(ids: NonEmptyList[PostId]): Query[Map[PostId, Post]] = { Query.sync({ - latency(postDatabase.filterKeys(ids.unwrap.contains), s"Many Posts $ids") + latency(postDatabase.filterKeys(ids.toList.contains), s"Many Posts $ids") }) } } @@ -367,6 +371,8 @@ We'll implement a data source for retrieving a post topic given a post id. ```tut:silent implicit object PostTopicSource extends DataSource[Post, PostTopic]{ + override def name = "Post topic data source" + override def fetchOne(id: Post): Query[Option[PostTopic]] = { Query.sync({ val topic = if (id.id % 2 == 0) "monad" else "applicative" @@ -375,7 +381,7 @@ implicit object PostTopicSource extends DataSource[Post, PostTopic]{ } override def fetchMany(ids: NonEmptyList[Post]): Query[Map[Post, PostTopic]] = { Query.sync({ - val result = ids.unwrap.map(id => (id, if (id.id % 2 == 0) "monad" else "applicative")).toMap + val result = ids.toList.map(id => (id, if (id.id % 2 == 0) "monad" else "applicative")).toMap latency(result, s"Many Post Topics $ids") }) } @@ -455,7 +461,7 @@ combinator. It takes a `List[Fetch[A]]` and gives you back a `Fetch[List[A]]`, b data source and running fetches to different sources in parallel. Note that the `sequence` combinator is more general and works not only on lists but on any type that has a [Traverse](http://typelevel.org/cats/tut/traverse.html) instance. ```tut:silent -import cats.std.list._ +import cats.instances.list._ import cats.syntax.traverse._ val fetchSequence: Fetch[List[User]] = List(getUser(1), getUser(2), getUser(3)).sequence @@ -575,16 +581,28 @@ fetchSameTwice.runA[Id](ForgetfulCache()) # Error handling -Fetch is used for reading data from remote sources and the queries we perform can and will fail at some point. What happens if we run a fetch and fails? We'll create a fetch that always fails to learn about it. +Fetch is used for reading data from remote sources and the queries we perform can and will fail at some point. There are many things that can +go wrong: + - an exception can be thrown by client code of certain data sources + - an identity may be missing + - the data source may be temporarily available + +Since the error cases are plenty and can't be anticipated Fetch errors are represented by the `FetchException` trait, which extends `Throwable`. +Currently fetch defines `FetchException` cases for missing identities and arbitrary exceptions but you can extend `FetchException` with any error +you want. + +## Exceptions + +What happens if we run a fetch and fails with an exception? We'll create a fetch that always fails to learn about it. ```tut:silent -val fetchError: Fetch[User] = (new Exception("Oh noes")).fetch +val fetchException: Fetch[User] = (new Exception("Oh noes")).fetch ``` -If we try to execute to `Id` the exception will be thrown. +If we try to execute to `Id` the exception will be thrown wrapped in a `FetchException`. ```tut:fail -fetchError.runA[Id] +fetchException.runA[Id] ``` Since `Id` runs the fetch eagerly, the only way to recover from errors when running it is surrounding it with a `try-catch` block. We'll use Cats' `Eval` type as the target @@ -596,13 +614,13 @@ We can use the `FetchMonadError[Eval]#attempt` to convert a fetch result into a import fetch.unsafe.implicits._ ``` -Now we can convert `Eval[User]` into `Eval[Throwable Xor User]` and capture exceptions as values in the left of the disjunction. +Now we can convert `Eval[User]` into `Eval[FetchException Xor User]` and capture exceptions as values in the left of the disjunction. ```tut:book import cats.Eval import cats.data.Xor -val safeResult: Eval[Throwable Xor User] = FetchMonadError[Eval].attempt(fetchError.runA[Eval]) +val safeResult: Eval[FetchException Xor User] = FetchMonadError[Eval].attempt(fetchException.runA[Eval]) safeResult.value ``` @@ -612,17 +630,66 @@ And more succintly with Cats' applicative error syntax. ```tut:book import cats.syntax.applicativeError._ -fetchError.runA[Eval].attempt.value +fetchException.runA[Eval].attempt.value ``` ## Missing identities -You've probably noticed that `DataSource.fetch` takes a list of identities and returns a map of identities to their result, taking -into account the possibility of some identities not being found. Whenever an identity cannot be found, the fetch execution will -fail. +You've probably noticed that `DataSource.fetchOne` and `DataSource.fetchMany` return types help Fetch know if any requested +identity was not found. Whenever an identity cannot be found, the fetch execution will fail with an instance of `FetchException`. + +The requests can be of different types, each of which is described below. + +### One request + +When a single identity is being fetched the request will be a `FetchOne`; it contains the data source and the identity to fetch so you +should be able to easily diagnose the failure. For ilustrating this scenario we'll ask for users that are not in the database. + +```tut:silent +val missingUser = getUser(5) +val eval: Eval[FetchException Xor User] = missingUser.runA[Eval].attempt +val result: FetchException Xor User = eval.value +val err: FetchException = result.swap.toOption.get // don't do this at home, folks +``` + +`NotFound` allows you to access the fetch request that was in progress when the error happened and the environment of the fetch. + +```tut:book +err match { + case nf: NotFound => { + println("Request " + nf.request) + println("Environment " + nf.env) + } +} +``` + +As you can see in the output, the error was actually a `NotFound`. We can access the request with `.request`, which lets us +know that the failed request was for the identity `5` of the user data source. We can also see that the environment has an empty +cache and no rounds of execution happened yet. + +### Multiple requests + +When multiple requests to the same data source are batched and/or multiple requests are performed at the same time, is possible that more than one identity was missing. There is another error case for such situations: `MissingIdentities`, which contains a mapping from data source names to the list of missing identities. + +```tut:silent +val missingUsers = List(3, 4, 5, 6).traverse(getUser) +val eval: Eval[FetchException Xor List[User]] = missingUsers.runA[Eval].attempt +val result: FetchException Xor List[User] = eval.value +val err: FetchException = result.swap.toOption.get // don't do this at home, folks +``` + +The `.missing` attribute will give us the mapping from data source name to missing identities, and `.env` will give us the environment so we can track the execution of the fetch. + +```tut:book +err match { + case mi: MissingIdentities => { + println("Missing identities " + mi.missing) + println("Environment " + mi.env) + } +} +``` -Whenever a fetch fails, a `FetchFailure` exception is thrown. The `FetchFailure` will have the environment, which gives you information -about the execution of the fetch. +## Your own errors # Syntax @@ -907,7 +974,7 @@ Await.result(task.runAsync(ioSched), Duration.Inf) ## Custom types -If you want to run a fetch to a custom type `M[_]`, you need to implement the `FetchMonadError[M]` typeclass. `FetchMonadError[M]` is simply a `MonadError[M, Throwable]` from cats augmented +If you want to run a fetch to a custom type `M[_]`, you need to implement the `FetchMonadError[M]` typeclass. `FetchMonadError[M]` is simply a `MonadError[M, FetchException]` from cats augmented with a method for running a `Query[A]` in the context of the monad `M[A]`. For ilustrating integration with an asynchronous concurrency monad we'll use the implementation of Monix Task. @@ -978,28 +1045,36 @@ implicit val taskFetchMonadError: FetchMonadError[Task] = new FetchMonadError[Ta override def product[A, B](fa: Task[A], fb: Task[B]): Task[(A, B)] = Task.zip2(Task.fork(fa), Task.fork(fb)) // introduce parallelism with Task#fork - override def pureEval[A](e: Eval[A]): Task[A] = evalToTask(e) - def pure[A](x: A): Task[A] = Task.now(x) - def handleErrorWith[A](fa: Task[A])(f: Throwable => Task[A]): Task[A] = - fa.onErrorHandleWith(f) + def handleErrorWith[A](fa: Task[A])(f: FetchException => Task[A]): Task[A] = + fa.onErrorHandleWith({ case e: FetchException => f(e) }) - def raiseError[A](e: Throwable): Task[A] = + def raiseError[A](e: FetchException): Task[A] = Task.raiseError(e) def flatMap[A, B](fa: Task[A])(f: A => Task[B]): Task[B] = fa.flatMap(f) + def tailRecM[A, B](a: A)(f: A => Task[Either[A, B]]): Task[B] = + defaultTailRecM(a)(f) // same implementation as monix.cats + override def runQuery[A](q: Query[A]): Task[A] = queryToTask(q) } ``` +```tut:silent +import cats.RecursiveTailRecM + +val taskTR: RecursiveTailRecM[Task] = + RecursiveTailRecM.create[Task] +``` + We can now import the above implicit and run a fetch to our custom type, let's give it a go: ```tut:book -val task = Fetch.run(homePage)(taskFetchMonadError) +val task = Fetch.run(homePage)(taskFetchMonadError, taskTR) Await.result(task.runAsync(scheduler), Duration.Inf) ``` @@ -1016,7 +1091,7 @@ Await.result(task.runAsync(scheduler), Duration.Inf) Fetch stands on the shoulders of giants: - [Haxl](https://github.com/facebook/haxl) is Facebook's implementation (Haskell) of the [original paper Fetch is based on](http://community.haskell.org/~simonmar/papers/haxl-icfp14.pdf). -- [Clump](http://getclump.io) has inspired the signature of the `DataSource#fetch` method. +- [Clump](http://getclump.io) has inspired the signature of the `DataSource#fetch*` methods. - [Stitch](https://engineering.twitter.com/university/videos/introducing-stitch) is an in-house Twitter library that is not open source but has inspired Fetch's high-level API. - [Cats](http://typelevel.org/cats/), a library for functional programming in Scala. - [Monix](https://monix.io) high-performance and multiplatform (Scala / Scala.js) asynchronous programming library. diff --git a/docs/src/tut/index.md b/docs/src/tut/index.md index 1cfa61b5..772720ef 100644 --- a/docs/src/tut/index.md +++ b/docs/src/tut/index.md @@ -63,7 +63,7 @@ We'll implement a dummy data source that can convert integers to strings. For co ```tut:silent import cats.data.NonEmptyList -import cats.std.list._ +import cats.instances.list._ import fetch._ implicit object ToStringSource extends DataSource[Int, String]{ @@ -76,7 +76,7 @@ implicit object ToStringSource extends DataSource[Int, String]{ override def fetchMany(ids: NonEmptyList[Int]): Query[Map[Int, String]] = { Query.sync({ println(s"[${Thread.currentThread.getId}] Many ToString $ids") - ids.unwrap.map(i => (i, i.toString)).toMap + ids.toList.map(i => (i, i.toString)).toMap }) } } @@ -144,7 +144,7 @@ implicit object LengthSource extends DataSource[String, Int]{ override def fetchMany(ids: NonEmptyList[String]): Query[Map[String, Int]] = { Query.async((ok, fail) => { println(s"[${Thread.currentThread.getId}] Many Length $ids") - ok(ids.unwrap.map(i => (i, i.size)).toMap) + ok(ids.toList.map(i => (i, i.size)).toMap) }) } } diff --git a/jvm/src/main/scala/unsafeImplicits.scala b/jvm/src/main/scala/unsafeImplicits.scala index 8a939ff0..8e2711d2 100644 --- a/jvm/src/main/scala/unsafeImplicits.scala +++ b/jvm/src/main/scala/unsafeImplicits.scala @@ -18,14 +18,16 @@ package fetch.unsafe import fetch._ -import cats.{Id, Eval} +import cats.{Id, Eval, FlatMap} import cats.data.Xor import scala.concurrent._ import scala.concurrent.duration._ object implicits { - implicit val evalFetchMonadError: FetchMonadError[Eval] = new FetchMonadError[Eval] { + implicit def evalFetchMonadError( + implicit FM: FlatMap[Eval] + ): FetchMonadError[Eval] = new FetchMonadError[Eval] { override def runQuery[A](j: Query[A]): Eval[A] = j match { case Sync(e) => e case Ap(qf, qx) => ap(runQuery(qf))(runQuery(qx)) @@ -53,20 +55,31 @@ object implicits { } } def pure[A](x: A): Eval[A] = Eval.now(x) - def handleErrorWith[A](fa: Eval[A])(f: Throwable => Eval[A]): Eval[A] = + + def tailRecM[A, B](a: A)(f: A => Eval[Either[A, B]]): Eval[B] = FM.tailRecM(a)(f) + + def handleErrorWith[A](fa: Eval[A])(f: FetchException => Eval[A]): Eval[A] = Eval.later({ try { fa.value } catch { - case ex: Throwable => f(ex).value + case ex: FetchException => f(ex).value + case th: Throwable => f(UnhandledException(th)).value } }) - def raiseError[A](e: Throwable): Eval[A] = Eval.later({ throw e }) + + def raiseError[A](e: FetchException): Eval[A] = + Eval.later({ + throw e + }) + def flatMap[A, B](fa: Eval[A])(f: A => Eval[B]): Eval[B] = fa.flatMap(f) } - implicit val idFetchMonadError: FetchMonadError[Id] = new FetchMonadError[Id] { + implicit def idFetchMonadError( + implicit FM: FlatMap[Id] + ): FetchMonadError[Id] = new FetchMonadError[Id] { override def runQuery[A](j: Query[A]): Id[A] = j match { case Sync(e) => e.value case Ap(qf, qx) => ap(runQuery(qf))(runQuery(qx)) @@ -92,14 +105,23 @@ object implicits { } } } - def pure[A](x: A): Id[A] = x - def handleErrorWith[A](fa: Id[A])(f: Throwable => Id[A]): Id[A] = + def pure[A](x: A): Id[A] = x + def tailRecM[A, B](a: A)(f: A => cats.Id[Either[A, B]]): cats.Id[B] = FM.tailRecM(a)(f) + + def handleErrorWith[A](fa: Id[A])(f: FetchException => Id[A]): Id[A] = try { fa } catch { - case ex: Throwable => f(ex) + case ex: FetchException => f(ex) + } + def raiseError[A](e: FetchException): Id[A] = + e match { + case UnhandledException(ex) => { + e.initCause(ex) + throw e + } + case other => throw other } - def raiseError[A](e: Throwable): Id[A] = throw e def flatMap[A, B](fa: Id[A])(f: A => Id[B]): Id[B] = f(fa) } } diff --git a/monix/shared/src/main/scala/monix.scala b/monix/shared/src/main/scala/monix.scala index 39e56f19..60077e54 100644 --- a/monix/shared/src/main/scala/monix.scala +++ b/monix/shared/src/main/scala/monix.scala @@ -18,7 +18,7 @@ package fetch.monixTask import fetch._ -import cats.{Eval, Now, Later, Always, Traverse, Applicative} +import cats.{Eval, Now, Later, Always, Monad, RecursiveTailRecM} import monix.eval.Task import monix.execution.{Scheduler, Cancelable} @@ -29,10 +29,12 @@ object implicits { def evalToTask[A](e: Eval[A]): Task[A] = e match { case Now(x) => Task.now(x) case l: Later[A] => Task.evalOnce({ l.value }) - case a: Always[A] => Task.evalAlways({ a.value }) + case a: Always[A] => Task.eval({ a.value }) case other => Task.evalOnce({ other.value }) } + implicit val fetchTaskRecursiveTailRecM: RecursiveTailRecM[Task] = RecursiveTailRecM.create[Task] + implicit val fetchTaskFetchMonadError: FetchMonadError[Task] = new FetchMonadError[Task] { override def map[A, B](fa: Task[A])(f: A => B): Task[B] = fa.map(f) @@ -40,20 +42,23 @@ object implicits { override def product[A, B](fa: Task[A], fb: Task[B]): Task[(A, B)] = Task.zip2(Task.fork(fa), Task.fork(fb)) - override def pureEval[A](e: Eval[A]): Task[A] = evalToTask(e) - def pure[A](x: A): Task[A] = Task.now(x) - def handleErrorWith[A](fa: Task[A])(f: Throwable => Task[A]): Task[A] = - fa.onErrorHandleWith(f) + def handleErrorWith[A](fa: Task[A])(f: FetchException => Task[A]): Task[A] = + fa.onErrorHandleWith({ + case e: FetchException => f(e) + }) - def raiseError[A](e: Throwable): Task[A] = + def raiseError[A](e: FetchException): Task[A] = Task.raiseError(e) def flatMap[A, B](fa: Task[A])(f: A => Task[B]): Task[B] = fa.flatMap(f) + def tailRecM[A, B](a: A)(f: A => Task[Either[A, B]]): Task[B] = + defaultTailRecM(a)(f) + override def runQuery[A](q: Query[A]): Task[A] = q match { case Sync(x) => evalToTask(x) case Async(ac, timeout) => { diff --git a/monix/shared/src/test/scala/FetchTaskTests.scala b/monix/shared/src/test/scala/FetchTaskTests.scala index ee90c7ae..57812177 100644 --- a/monix/shared/src/test/scala/FetchTaskTests.scala +++ b/monix/shared/src/test/scala/FetchTaskTests.scala @@ -20,7 +20,7 @@ import monix.execution.Scheduler import org.scalatest._ import cats.data.NonEmptyList -import cats.std.list._ +import cats.instances.list._ import fetch._ import fetch.monixTask.implicits._ @@ -28,8 +28,7 @@ import fetch.monixTask.implicits._ import scala.concurrent.Future class FetchTaskTests extends AsyncFreeSpec with Matchers { - implicit def executionContext = Scheduler.Implicits.global - override def newInstance = new FetchTaskTests + implicit override def executionContext = Scheduler.Implicits.global case class ArticleId(id: Int) case class Article(id: Int, content: String) { diff --git a/project/plugins.sbt b/project/plugins.sbt index a440d3ab..b5ab9465 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,4 @@ -addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.8") +addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.13") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "1.5.1") addSbtPlugin("com.typesafe.sbt" % "sbt-site" % "1.0.0") addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.4" exclude("com.typesafe.sbt", "sbt-git")) diff --git a/shared/src/main/scala/datasource.scala b/shared/src/main/scala/datasource.scala index 34bab402..37db34d0 100644 --- a/shared/src/main/scala/datasource.scala +++ b/shared/src/main/scala/datasource.scala @@ -17,7 +17,7 @@ package fetch import cats.data.NonEmptyList -import cats.std.list._ +import cats.instances.list._ import cats.syntax.functor._ import cats.syntax.traverse._ @@ -29,7 +29,8 @@ trait DataSource[I, A] { /** The name of the data source. */ - def name: DataSourceName = this.toString + def name: DataSourceName = this.getClass.getName + override def toString: String = name /** * Derive a `DataSourceIdentity` from an identity, suitable for storing the result @@ -50,7 +51,7 @@ trait DataSource[I, A] { * source doesn't support batching. */ def batchingNotSupported(ids: NonEmptyList[I]): Query[Map[I, A]] = { - val idsList = ids.unwrap + val idsList = ids.toList idsList .map(fetchOne) .sequence diff --git a/shared/src/main/scala/env.scala b/shared/src/main/scala/env.scala index b87bcf46..0e28a2b2 100644 --- a/shared/src/main/scala/env.scala +++ b/shared/src/main/scala/env.scala @@ -23,20 +23,9 @@ import scala.collection.immutable._ * cache and the list of rounds that have been executed. */ trait Env { - def cache: DataSourceCache def rounds: Seq[Round] - - def cached: Seq[Round] = - rounds.filter(_.cached) - - def uncached: Seq[Round] = - rounds.filterNot(_.cached) - - def next( - newCache: DataSourceCache, - newRound: Round, - newIds: List[Any] - ): Env + def cache: DataSourceCache + def evolve(newRound: Round, newCache: DataSourceCache): Env } /** @@ -44,38 +33,26 @@ trait Env { */ case class Round( cache: DataSourceCache, - ds: DataSourceName, - kind: RoundKind, - startRound: Long, - endRound: Long, - cached: Boolean = false + request: FetchRequest, + response: Any, + start: Long, + end: Long ) { - def duration: Double = (endRound - startRound) / 1e6 - - def isConcurrent: Boolean = kind match { - case ConcurrentRound(_) => true - case _ => false - } + def duration: Double = (end - start) / 1e6 } -sealed trait RoundKind -final case class OneRound(id: Any) extends RoundKind -final case class ManyRound(ids: List[Any]) extends RoundKind -final case class ConcurrentRound(ids: Map[String, List[Any]]) extends RoundKind - /** * A concrete implementation of `Env` used in the default Fetch interpreter. */ case class FetchEnv( cache: DataSourceCache, - ids: List[Any] = Nil, rounds: Queue[Round] = Queue.empty ) extends Env { - def next( - newCache: DataSourceCache, + + def evolve( newRound: Round, - newIds: List[Any] + newCache: DataSourceCache ): FetchEnv = - copy(cache = newCache, rounds = rounds :+ newRound, ids = newIds) + copy(rounds = rounds :+ newRound, cache = newCache) } diff --git a/shared/src/main/scala/fetch.scala b/shared/src/main/scala/fetch.scala index f44ecdaa..e176ad26 100644 --- a/shared/src/main/scala/fetch.scala +++ b/shared/src/main/scala/fetch.scala @@ -18,11 +18,14 @@ package fetch import scala.collection.immutable.Map -import cats.{Applicative, Monad, ApplicativeError, MonadError, ~>, Eval} -import cats.data.{StateT, Const, NonEmptyList} -import cats.free.{Free} -import cats.std.list._ -import cats.std.option._ +import cats.{Applicative, Monad, ApplicativeError, MonadError, ~>, Eval, RecursiveTailRecM} +import cats.data.{StateT, Const, NonEmptyList, Writer, XorT} +import cats.free.Free +import cats.instances.list._ +import cats.instances.map._ +import cats.instances.option._ +import cats.syntax.foldable._ +import cats.syntax.functor._ import cats.syntax.traverse._ import scala.concurrent.duration.Duration @@ -51,8 +54,7 @@ object Query { ): Query[A] = Async(action, timeout) implicit val fetchQueryApplicative: Applicative[Query] = new Applicative[Query] { - override def pureEval[A](e: Eval[A]): Query[A] = Sync(e) - def pure[A](x: A): Query[A] = Sync(Eval.now(x)) + def pure[A](x: A): Query[A] = Sync(Eval.now(x)) def ap[A, B](ff: Query[A => B])(fa: Query[A]): Query[B] = Ap(ff, fa) } @@ -60,22 +62,31 @@ object Query { /** Requests in Fetch Free monad. */ -sealed trait FetchRequest[I, A] extends Product with Serializable { +sealed trait FetchRequest extends Product with Serializable { def fullfilledBy(cache: DataSourceCache): Boolean +} + +sealed trait FetchQuery[I, A] extends FetchRequest { def missingIdentities(cache: DataSourceCache): List[I] def dataSource: DataSource[I, A] def identities: NonEmptyList[I] } +trait FetchException extends Throwable with Product with Serializable +case class NotFound(env: Env, request: FetchOne[_, _]) extends FetchException +case class MissingIdentities(env: Env, missing: Map[DataSourceName, List[Any]]) + extends FetchException +case class UnhandledException(err: Throwable) extends FetchException + /** * Primitive operations in the Fetch Free monad. */ sealed abstract class FetchOp[A] extends Product with Serializable -final case class Cached[A](a: A) extends FetchOp[A] +final case class Fetched[A](a: A) extends FetchOp[A] final case class FetchOne[I, A](a: I, ds: DataSource[I, A]) extends FetchOp[A] - with FetchRequest[I, A] { + with FetchQuery[I, A] { override def fullfilledBy(cache: DataSourceCache): Boolean = { cache.get[A](ds.identity(a)).isDefined } @@ -85,20 +96,28 @@ final case class FetchOne[I, A](a: I, ds: DataSource[I, A]) override def dataSource: DataSource[I, A] = ds override def identities: NonEmptyList[I] = NonEmptyList(a, Nil) } + final case class FetchMany[I, A](as: NonEmptyList[I], ds: DataSource[I, A]) extends FetchOp[List[A]] - with FetchRequest[I, A] { + with FetchQuery[I, A] { override def fullfilledBy(cache: DataSourceCache): Boolean = { as.forall((i: I) => cache.get[A](ds.identity(i)).isDefined) } + override def missingIdentities(cache: DataSourceCache): List[I] = { - as.unwrap.distinct.filterNot(i => cache.get[A](ds.identity(i)).isDefined) + as.toList.distinct.filterNot(i => cache.get[A](ds.identity(i)).isDefined) } override def dataSource: DataSource[I, A] = ds override def identities: NonEmptyList[I] = as } -final case class Concurrent(as: List[FetchRequest[_, _]]) extends FetchOp[DataSourceCache] -final case class FetchError[A](err: Throwable) extends FetchOp[A] +final case class Concurrent(as: List[FetchQuery[_, _]]) + extends FetchOp[DataSourceCache] + with FetchRequest { + override def fullfilledBy(cache: DataSourceCache): Boolean = { + as.forall(_.fullfilledBy(cache)) + } +} +final case class Thrown[A](err: Throwable) extends FetchOp[A] object `package` { type DataSourceName = String @@ -106,7 +125,7 @@ object `package` { type Fetch[A] = Free[FetchOp, A] - trait FetchMonadError[M[_]] extends MonadError[M, Throwable] { + trait FetchMonadError[M[_]] extends MonadError[M, FetchException] { def runQuery[A](q: Query[A]): M[A] } @@ -123,6 +142,12 @@ object `package` { def ap[A, B](ff: Fetch[A => B])(fa: Fetch[A]): Fetch[B] = Fetch.join(ff, fa).map({ case (f, a) => f(a) }) + + override def product[A, B](fa: Fetch[A], fb: Fetch[B]): Fetch[(A, B)] = + Fetch.join(fa, fb) + + override def tuple2[A, B](fa: Fetch[A], fb: Fetch[B]): Fetch[(A, B)] = + Fetch.join(fa, fb) } object Fetch extends FetchInterpreters { @@ -134,10 +159,10 @@ object `package` { Free.pure(a) /** - * Lift an error to the Fetch monad. + * Lift an exception to the Fetch monad. */ def error[A](e: Throwable): Fetch[A] = - Free.liftF(FetchError(e)) + Free.liftF(Thrown(e)) /** * Given a value that has a related `DataSource` implementation, lift it @@ -149,130 +174,133 @@ object `package` { ): Fetch[A] = Free.liftF(FetchOne[I, A](i, DS)) - type FM = List[FetchOp[_]] - private[this] val DM: Monad[Const[FM, ?]] = new Monad[Const[FM, ?]] { - def pure[A](x: A): Const[FM, A] = Const(List()) - - def flatMap[A, B](fa: Const[FM, A])(f: A => Const[FM, B]): Const[FM, B] = fa match { - case Const(List(Cached(a))) => f(a.asInstanceOf[A]) - case other => fa.asInstanceOf[Const[FM, B]] - } - } - - private[this] def deps[A](f: Fetch[_]): List[FetchRequest[_, _]] = { - f.foldMap[Const[FM, ?]](new (FetchOp ~> Const[FM, ?]) { - def apply[X](x: FetchOp[X]): Const[FM, X] = x match { - case one @ FetchOne(id, ds) => Const(List(one)) - case conc @ Concurrent(as) => Const(as.asInstanceOf[FM]) - case cach @ Cached(a) => Const(List(cach)) - case _ => Const(List()) - } - })(DM) - .getConst - .collect({ - case one @ FetchOne(_, _) => one - case many @ FetchMany(_, _) => many - }) - } - - private[this] def combineDeps(ds: List[FetchRequest[_, _]]): List[FetchRequest[_, _]] = { - ds.foldLeft(Map.empty[DataSource[_, _], NonEmptyList[Any]])((acc, op) => - op match { - case one @ FetchOne(id, ds) => - acc.updated(ds, - acc - .get(ds) - .fold(NonEmptyList(id): NonEmptyList[Any])(accids => - accids.combine(NonEmptyList(id)))) - case many @ FetchMany(ids, ds) => - acc.updated(ds, - acc - .get(ds) - .fold(ids.asInstanceOf[NonEmptyList[Any]])(accids => - accids.combine(ids.asInstanceOf[NonEmptyList[Any]]))) - case _ => acc - }) - .toList - .map({ - case (ds, ids) if ids.unwrap.size == 1 => - FetchOne[Any, Any](ids.head, ds.asInstanceOf[DataSource[Any, Any]]) - case (ds, ids) => - FetchMany[Any, Any](ids, ds.asInstanceOf[DataSource[Any, Any]]) - }) - } - - private[this] def concurrently(fa: Fetch[_], fb: Fetch[_]): Fetch[DataSourceCache] = { - val fetches: List[FetchRequest[_, _]] = combineDeps(deps(fa) ++ deps(fb)) + /** + * Given a list of `FetchRequest`s, lift it to the `Fetch` monad. When executing + * the fetch, data sources will be queried and the fetch will return a `DataSourceCache` + * containing the results. + */ + private[this] def concurrently(fetches: List[FetchQuery[_, _]]): Fetch[DataSourceCache] = Free.liftF(Concurrent(fetches)) - } /** * Transform a list of fetches into a fetch of a list. It implies concurrent execution of fetches. */ - def sequence[I, A](ids: List[Fetch[A]]): Fetch[List[A]] = { - ids.foldLeft(Fetch.pure(List(): List[A]))((f, newF) => - Fetch.join(f, newF).map(t => t._1 :+ t._2)) - } + def sequence[I, A](ids: List[Fetch[A]]): Fetch[List[A]] = + Applicative[Fetch].sequence(ids) /** * Apply a fetch-returning function to every element in a list and return a Fetch of the list of * results. It implies concurrent execution of fetches. */ def traverse[A, B](ids: List[A])(f: A => Fetch[B]): Fetch[List[B]] = - sequence(ids.map(f)) + Applicative[Fetch].traverse(ids)(f) /** * Apply the given function to the result of the two fetches. It implies concurrent execution of fetches. */ def map2[A, B, C](f: (A, B) => C)(fa: Fetch[A], fb: Fetch[B]): Fetch[C] = - Fetch.join(fa, fb).map({ case (a, b) => f(a, b) }) + Applicative[Fetch].map2(fa, fb)(f) + + /** + * Join two fetches from any data sources and return a Fetch that returns a tuple with the two + * results. It implies concurrent execution of fetches. + */ + def join[A, B](fl: Fetch[A], fr: Fetch[B]): Fetch[(A, B)] = { + def depFetches(fa: Fetch[_], fb: Fetch[_]): List[FetchQuery[_, _]] = + combineQueries(dependentQueries(fa) ++ dependentQueries(fb)) + + def joinWithFetches( + fl: Fetch[A], fr: Fetch[B], fetches: List[FetchQuery[_, _]]): Fetch[(A, B)] = + concurrently(fetches).flatMap(cache => joinH(fl, fr, cache)) - private[this] def simplify(results: DataSourceCache): (FetchOp ~> FetchOp) = { + def joinH(fl: Fetch[A], fr: Fetch[B], cache: DataSourceCache): Fetch[(A, B)] = { + val sfl = fl.compile(simplify(cache)) + val sfr = fr.compile(simplify(cache)) + + val remainingDeps = depFetches(sfl, sfr) + + if (remainingDeps.isEmpty) Monad[Fetch].tuple2(sfl, sfr) + else joinWithFetches(sfl, sfr, remainingDeps) + } + + joinWithFetches(fl, fr, depFetches(fl, fr)) + } + + /** + * Use a `DataSourceCache` to optimize a `FetchOp`. + * If the cache contains all the fetch identities, the fetch doesn't need to be + * executed and can be replaced by cached results. + */ + private[this] def simplify(cache: DataSourceCache): (FetchOp ~> FetchOp) = { new (FetchOp ~> FetchOp) { - def apply[B](f: FetchOp[B]): FetchOp[B] = f match { - case one @ FetchOne(id, ds) => { - results.get[B](ds.identity(id)).fold(one: FetchOp[B])(b => Cached(b)) - } - case many @ FetchMany(ids, ds) => { - val fetched = ids.map(id => results.get(ds.identity(id))).unwrap.sequence - fetched.fold(many: FetchOp[B])(results => Cached(results)) - } - case conc @ Concurrent(manies) => { - val newManies = manies.filterNot(_.fullfilledBy(results)) - - if (newManies.isEmpty) - Cached(results).asInstanceOf[FetchOp[B]] - else - Concurrent(newManies).asInstanceOf[FetchOp[B]] - } + def apply[B](fetchOp: FetchOp[B]): FetchOp[B] = fetchOp match { + case one @ FetchOne(id, ds) => + cache.get[B](ds.identity(id)).fold(fetchOp)(b => Fetched(b)) + case many @ FetchMany(ids, ds) => + val fetched = ids.traverse(id => cache.get(ds.identity(id))) + fetched.fold(fetchOp)(results => Fetched(results.toList)) + case conc @ Concurrent(manies) => + val newManies = manies.filterNot(_.fullfilledBy(cache)) + (if (newManies.isEmpty) Fetched(cache) else Concurrent(newManies)): FetchOp[B] case other => other } } } /** - * Join two fetches from any data sources and return a Fetch that returns a tuple with the two - * results. It implies concurrent execution of fetches. + * Combine multiple queries so the resulting `List` only contains one `FetchQuery` + * per `DataSource`. */ - def join[A, B](fl: Fetch[A], fr: Fetch[B]): Fetch[(A, B)] = { - for { - cache <- concurrently(fl, fr) - result <- { - val sfl = fl.compile(simplify(cache)) - val sfr = fr.compile(simplify(cache)) - - val remainingDeps = combineDeps(deps(sfl) ++ deps(sfr)) - - if (remainingDeps.isEmpty) { - for { - a <- sfl - b <- sfr - } yield (a, b) - } else { - join[A, B](sfl, sfr) - } + private[this] def combineQueries(ds: List[FetchQuery[_, _]]): List[FetchQuery[_, _]] = + ds.foldMap[Map[DataSource[_, _], NonEmptyList[Any]]] { + case FetchOne(id, ds) => Map(ds -> NonEmptyList.of[Any](id)) + case FetchMany(ids, ds) => Map(ds -> ids.widen[Any]) + } + .mapValues { nel => + // workaround because NEL[Any].distinct needs Order[Any] + NonEmptyList.fromListUnsafe(nel.toList.distinct) } - } yield result + .toList + .map { + case (ds, NonEmptyList(id, Nil)) => FetchOne(id, ds.castDS[Any, Any]) + case (ds, ids) => FetchMany(ids, ds.castDS[Any, Any]) + } + + private[this] type FetchOps = List[FetchOp[_]] + private[this] type KeepFetches[A] = Writer[FetchOps, A] + private[this] type AnalyzeTop[A] = XorT[KeepFetches, Unit, A] + + private[this] object AnalyzeTop { + def stopWith[R](list: FetchOps): AnalyzeTop[R] = + AnalyzeTop.stop(Writer.tell(list)) + + def stopEmpty[R]: AnalyzeTop[R] = + AnalyzeTop.stop(Writer.value(())) + + def stop[R](k: KeepFetches[Unit]): AnalyzeTop[R] = + XorT.left[KeepFetches, Unit, R](k) + + def go[X](k: KeepFetches[X]): AnalyzeTop[X] = + XorT.right[KeepFetches, Unit, X](k) + } + + /** + * Get a list of dependent `FetchQuery`s for a given `Fetch`. + */ + private[this] def dependentQueries(f: Fetch[_]): List[FetchQuery[_, _]] = { + val analyzeTop: FetchOp ~> AnalyzeTop = new (FetchOp ~> AnalyzeTop) { + def apply[A](op: FetchOp[A]): AnalyzeTop[A] = op match { + case fetc @ Fetched(c) => AnalyzeTop.go(Writer(List(fetc), c)) + case one @ FetchOne(_, _) => AnalyzeTop.stopWith(List(one)) + case conc @ Concurrent(as) => AnalyzeTop.stopWith(as.asInstanceOf[FetchOps]) + case _ => AnalyzeTop.stopEmpty + } + } + + f.foldMap[AnalyzeTop](analyzeTop).value.written.collect { + case one @ FetchOne(_, _) => one + case many @ FetchMany(_, _) => many + } } class FetchRunner[M[_]] { @@ -280,7 +308,8 @@ object `package` { fa: Fetch[A], cache: DataSourceCache = InMemoryCache.empty )( - implicit MM: FetchMonadError[M] + implicit MM: FetchMonadError[M], + TR: RecursiveTailRecM[M] ): M[(FetchEnv, A)] = fa.foldMap[FetchInterpreter[M]#f](interpreter).run(FetchEnv(cache)) } @@ -296,7 +325,8 @@ object `package` { fa: Fetch[A], cache: DataSourceCache = InMemoryCache.empty )( - implicit MM: FetchMonadError[M] + implicit MM: FetchMonadError[M], + TR: RecursiveTailRecM[M] ): M[FetchEnv] = fa.foldMap[FetchInterpreter[M]#f](interpreter).runS(FetchEnv(cache)) } @@ -311,7 +341,8 @@ object `package` { fa: Fetch[A], cache: DataSourceCache = InMemoryCache.empty )( - implicit MM: FetchMonadError[M] + implicit MM: FetchMonadError[M], + TR: RecursiveTailRecM[M] ): M[A] = fa.foldMap[FetchInterpreter[M]#f](interpreter).runA(FetchEnv(cache)) } @@ -321,4 +352,8 @@ object `package` { */ def run[M[_]]: FetchRunnerA[M] = new FetchRunnerA[M] } + + private[fetch] implicit class DataSourceCast[A, B](ds: DataSource[A, B]) { + def castDS[C, D]: DataSource[C, D] = ds.asInstanceOf[DataSource[C, D]] + } } diff --git a/shared/src/main/scala/implicits.scala b/shared/src/main/scala/implicits.scala index 6d49ad5f..463d52cc 100644 --- a/shared/src/main/scala/implicits.scala +++ b/shared/src/main/scala/implicits.scala @@ -16,17 +16,21 @@ package fetch -import cats.{Eval, MonadError} -import cats.std.FutureInstances +import cats.{Eval, MonadError, FlatMap} +import cats.instances.FutureInstances import scala.concurrent.{Promise, Future, ExecutionContext} object implicits extends FutureInstances { implicit def fetchFutureFetchMonadError( implicit ec: ExecutionContext, - ME: MonadError[Future, Throwable] + ME: MonadError[Future, Throwable], + FM: FlatMap[Future] ): FetchMonadError[Future] = new FetchMonadError[Future] { + override def tailRecM[A, B](a: A)( + f: A => scala.concurrent.Future[Either[A, B]]): scala.concurrent.Future[B] = + FM.tailRecM(a)(f) override def runQuery[A](j: Query[A]): Future[A] = j match { - case Sync(e) => ME.pureEval(e) + case Sync(e) => Future(e.value) case Async(ac, timeout) => { val p = Promise[A]() @@ -44,9 +48,9 @@ object implicits extends FutureInstances { }) } def pure[A](x: A): Future[A] = Future.successful(x) - def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] = - fa.recoverWith({ case t => f(t) }) - def raiseError[A](e: Throwable): Future[A] = Future.failed(e) + def handleErrorWith[A](fa: Future[A])(f: FetchException => Future[A]): Future[A] = + fa.recoverWith({ case t: FetchException => f(t) }) + def raiseError[A](e: FetchException): Future[A] = Future.failed(e) def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f) } } diff --git a/shared/src/main/scala/interpreters.scala b/shared/src/main/scala/interpreters.scala index 364aebc2..273d2fa2 100644 --- a/shared/src/main/scala/interpreters.scala +++ b/shared/src/main/scala/interpreters.scala @@ -19,240 +19,186 @@ package fetch import scala.collection.immutable._ import cats.{MonadError, ~>} -import cats.data.{StateT, NonEmptyList} -import cats.std.option._ -import cats.std.list._ +import cats.data.{OptionT, NonEmptyList, StateT, Validated, XorT} +import cats.instances.option._ +import cats.instances.list._ +import cats.instances.map._ +import cats.syntax.either._ +import cats.syntax.flatMap._ +import cats.syntax.foldable._ +import cats.syntax.functor._ +import cats.syntax.functorFilter._ +import cats.syntax.option._ import cats.syntax.traverse._ - -/** - * An exception thrown from the interpreter when failing to perform a data fetch. - */ -case class FetchFailure(env: Env) extends Throwable +import cats.syntax.validated._ trait FetchInterpreters { - - def pendingRequests( - requests: List[FetchRequest[_, _]], cache: DataSourceCache): List[FetchRequest[Any, Any]] = { - requests - .filterNot(_.fullfilledBy(cache)) - .map(req => { - (req.dataSource, req.missingIdentities(cache)) - }) - .collect({ - case (ds, ids) if ids.size == 1 => - FetchOne[Any, Any](ids.head, ds.asInstanceOf[DataSource[Any, Any]]) - case (ds, ids) if ids.size > 1 => - FetchMany[Any, Any]( - NonEmptyList(ids(0), ids.tail), ds.asInstanceOf[DataSource[Any, Any]]) - }) - } + def pendingQueries( + queries: List[FetchQuery[_, _]], + cache: DataSourceCache + ): List[FetchQuery[Any, Any]] = + queries.mapFilter { query => + val dsAny = query.dataSource.castDS[Any, Any] + NonEmptyList.fromList(query.missingIdentities(cache)).map { + case NonEmptyList(id, Nil) => FetchOne(id, dsAny) + case ids => FetchMany(ids.widen[Any], dsAny) + } + } def interpreter[I, M[_]]( implicit M: FetchMonadError[M] ): FetchOp ~> FetchInterpreter[M]#f = { new (FetchOp ~> FetchInterpreter[M]#f) { - def apply[A](fa: FetchOp[A]): FetchInterpreter[M]#f[A] = { + def apply[A](fa: FetchOp[A]): FetchInterpreter[M]#f[A] = StateT[M, FetchEnv, A] { env: FetchEnv => fa match { - case FetchError(e) => M.raiseError(e) - case Cached(a) => M.pure((env, a)) - case Concurrent(concurrentRequests) => { - val startRound = System.nanoTime() - val cache = env.cache + case Thrown(e) => M.raiseError(UnhandledException(e)) + case Fetched(a) => M.pure((env, a)) + case one @ FetchOne(_, _) => processOne(one, env) + case many @ FetchMany(_, _) => processMany(many, env) + case conc @ Concurrent(_) => processConcurrent(conc, env) + } + } + } + } + + private[this] def processOne[M[_], A]( + one: FetchOne[Any, A], + env: FetchEnv + )( + implicit M: FetchMonadError[M] + ): M[(FetchEnv, A)] = { + val FetchOne(id, ds) = one + val startRound = System.nanoTime() + env.cache + .get[A](ds.identity(id)) + .fold[M[(FetchEnv, A)]]( + M.runQuery(ds.fetchOne(id)).flatMap { (res: Option[A]) => + val endRound = System.nanoTime() + res.fold[M[(FetchEnv, A)]] { + // could not get result from datasource + M.raiseError(NotFound(env, one)) + } { result => + // found result (and update cache) + val newCache = env.cache.update(ds.identity(id), result) + val round = Round(env.cache, one, result, startRound, endRound) + M.pure(env.evolve(round, newCache) -> result) + } + } + ) { cached => + // get result from cache + M.pure(env -> cached) + } + } + + private[this] def processMany[M[_], A]( + many: FetchMany[Any, Any], + env: FetchEnv + )( + implicit M: FetchMonadError[M], + ev: List[Any] =:= A + ): M[(FetchEnv, A)] = { + val ids = many.as + val ds = many.ds //.castDS[Any, Any] + val startRound = System.nanoTime() + val cache = env.cache + val newIds = many.missingIdentities(cache) - val requests: List[FetchRequest[Any, Any]] = - pendingRequests(concurrentRequests, cache) + (for { + newIdsNel <- XorT.fromXor[M] { + NonEmptyList.fromList(newIds).toRightXor { + // no missing ids, get all from cache + val cachedResults = ids.toList.mapFilter(id => cache.get(ds.identity(id))) + env -> cachedResults + } + } + resMap <- XorT.right(M.runQuery(ds.fetchMany(newIdsNel))) + results <- ids.toList + .traverseU(id => resMap.get(id).toValidNel(id)) + .fold[XorT[M, (FetchEnv, List[Any]), List[Any]]]({ missingIds => + // not all identities could be found + val map = Map(ds.name -> missingIds.toList) + XorT.left(M.raiseError(MissingIdentities(env, map))) + }, XorT.pure) + } yield { + // found all results (and update cache) + val endRound = System.nanoTime() + val newCache = cache.cacheResults(resMap, ds) + val round = Round(cache, many, results, startRound, endRound) + env.evolve(round, newCache) -> results + }).merge.map { case (env, l) => (env, ev(l)) } // A =:= List[Any] + } - if (requests.isEmpty) - M.pure((env, cache.asInstanceOf[A])) - else { - val sentRequests = M.sequence(requests.map({ - case FetchOne(a, ds) => { - val ident = a.asInstanceOf[I] - val task = M.runQuery(ds.asInstanceOf[DataSource[I, A]].fetchOne(ident)) - M.map(task)((r: Option[A]) => - r.fold(Map.empty[I, A])((result: A) => Map(ident -> result))) - } - case FetchMany(as, ds) => - M.runQuery(ds - .asInstanceOf[DataSource[I, A]] - .fetchMany(as.asInstanceOf[NonEmptyList[I]])) - })) + private[this] def processConcurrent[M[_]]( + concurrent: Concurrent, + env: FetchEnv + )( + implicit M: FetchMonadError[M] + ): M[(FetchEnv, DataSourceCache)] = { + def runFetchQueryAsMap[I, A](op: FetchQuery[I, A]): M[Map[I, A]] = + op match { + case FetchOne(a, ds) => + OptionT(M.runQuery(ds.fetchOne(a))).map(r => Map(a -> r)).getOrElse(Map.empty) + case FetchMany(as, ds) => M.runQuery(ds.fetchMany(as)) + } - M.flatMap(sentRequests)((results: List[Map[_, _]]) => { - val endRound = System.nanoTime() - val newCache = (requests zip results).foldLeft(cache)((accache, resultset) => { - val (req, resultmap) = resultset - val ds = req.dataSource - val tresults = resultmap.asInstanceOf[Map[I, A]] - val tds = ds.asInstanceOf[DataSource[I, A]] - accache.cacheResults[I, A](tresults, tds) - }) - val newEnv = env.next( - newCache, - Round( - cache, - "Concurrent", - ConcurrentRound( - requests - .map({ - case FetchOne(a, ds) => (ds.name, List(a)) - case FetchMany(as, ds) => (ds.name, as.unwrap) - }) - .toMap - ), - startRound, - endRound - ), - Nil - ) + type MissingIdentitiesMap = Map[DataSourceName, List[Any]] - val allFullfilled = (requests zip results).forall({ - case (FetchOne(_, _), results) => results.size == 1 - case (FetchMany(as, _), results) => as.unwrap.size == results.size - case _ => false - }) + // Give for a list of queries and result(maps) all the missing identities + def missingIdentitiesOrAllFulfilled( + queriesAndResults: List[(FetchQuery[Any, Any], Map[Any, Any])] + ): Validated[MissingIdentitiesMap, Unit] = + queriesAndResults.traverseU_ { + case (FetchOne(id, ds), resultMap) => + Either.cond(resultMap.size == 1, (), Map(ds.name -> List(id))).toValidated + case (FetchMany(as, ds), resultMap) => + Either + .cond(as.toList.size == resultMap.size, + (), + Map(ds.name -> as.toList.filter(id => resultMap.get(id).isEmpty))) + .toValidated + case _ => + Map.empty[DataSourceName, List[Any]].invalid + } - if (allFullfilled) { - // since user-provided caches may discard elements, we use an in-memory - // cache to gather these intermediate results that will be used for - // concurrent optimizations. - val cachedResults = - (requests zip results).foldLeft(InMemoryCache.empty)((cach, resultSet) => { - val (req, resultmap) = resultSet - val ds = req.dataSource - val tresults = resultmap.asInstanceOf[Map[I, A]] - val tds = ds.asInstanceOf[DataSource[I, A]] - cach.cacheResults[I, A](tresults, tds).asInstanceOf[InMemoryCache] - }) - M.pure((newEnv, cachedResults.asInstanceOf[A])) - } else { - M.raiseError(FetchFailure(newEnv)) - } - }) - } - } - case FetchOne(id, ds) => { - val startRound = System.nanoTime() - val cache = env.cache - cache - .get[A](ds.identity(id)) - .fold[M[(FetchEnv, A)]]( - M.flatMap(M.runQuery(ds.fetchOne(id)))((res: Option[A]) => { - val endRound = System.nanoTime() - res.fold[M[(FetchEnv, A)]]( - M.raiseError( - FetchFailure( - env.next( - cache, - Round(cache, - ds.name, - OneRound(id), - startRound, - endRound), - List(id) - ) - ) - ) - )(result => { - val endRound = System.nanoTime() - val newCache = cache.update(ds.identity(id), result) - M.pure( - (env.next( - newCache, - Round(cache, - ds.name, - OneRound(id), - startRound, - endRound), - List(id) - ), - result) - ) - }) - }) - )(cached => { - val endRound = System.nanoTime() - M.pure( - (env.next( - cache, - Round(cache, - ds.name, - OneRound(id), - startRound, - endRound, - true), - List(id) - ), - cached) - ) - }) - } - case many @ FetchMany(ids, ds) => { - val startRound = System.nanoTime() - val cache = env.cache - val newIds = many.missingIdentities(cache) - if (newIds.isEmpty) - M.pure( - (env.next( - cache, - Round(cache, - ds.name, - ManyRound(ids.unwrap), - startRound, - System.nanoTime(), - true), - newIds - ), - ids.unwrap.flatMap(id => cache.get(ds.identity(id)))) - ) - else { - M.flatMap(M.runQuery(ds - .asInstanceOf[DataSource[I, A]] - .fetchMany(NonEmptyList(newIds(0).asInstanceOf[I], - newIds.tail.asInstanceOf[List[I]]))))( - (res: Map[I, A]) => { - val endRound = System.nanoTime() - ids.unwrap - .map(i => res.get(i.asInstanceOf[I])) - .sequence - .fold[M[(FetchEnv, A)]]( - M.raiseError( - FetchFailure( - env.next( - cache, - Round(cache, - ds.name, - ManyRound(ids.unwrap), - startRound, - endRound), - newIds - ) - ) - ) - )(results => { - val endRound = System.nanoTime() - val newCache = - cache.cacheResults[I, A](res, ds.asInstanceOf[DataSource[I, A]]) - M.pure( - (env.next( - newCache, - Round(cache, - ds.name, - ManyRound(ids.unwrap), - startRound, - endRound, - results.size < ids.unwrap.distinct.size), - newIds - ), - results) - ) - }) - }) - } - } - } - } + val startRound = System.nanoTime() + val cache = env.cache + + val queries: List[FetchQuery[Any, Any]] = pendingQueries(concurrent.as, cache) + + if (queries.isEmpty) + // there are no pending queries + M.pure((env, cache)) + else { + val sentRequests = queries.traverse(r => runFetchQueryAsMap(r)) + + sentRequests.flatMap { results => + val endRound = System.nanoTime() + val queriesAndResults = queries zip results + + val missingOrFulfilled = missingIdentitiesOrAllFulfilled(queriesAndResults) + + missingOrFulfilled.fold({ missingIds => + // not all identiies were found + M.raiseError(MissingIdentities(env, missingIds)) + }, { _ => + // results found for all identities + val round = Round(cache, Concurrent(queries), results, startRound, endRound) + + // since user-provided caches may discard elements, we use an in-memory + // cache to gather these intermediate results that will be used for + // concurrent optimizations. + val (newCache, cachedResults) = + queriesAndResults.foldLeft((cache, InMemoryCache.empty)) { + case ((userCache, internCache), (req, resultMap)) => + val anyMap = resultMap.asInstanceOf[Map[Any, Any]] + val anyDS = req.dataSource.castDS[Any, Any] + (userCache.cacheResults(anyMap, anyDS), + internCache.cacheResults(anyMap, anyDS).asInstanceOf[InMemoryCache]) + } + + M.pure((env.evolve(round, newCache), cachedResults)) + }) } } } diff --git a/shared/src/main/scala/syntax.scala b/shared/src/main/scala/syntax.scala index e407f8e8..8b140374 100644 --- a/shared/src/main/scala/syntax.scala +++ b/shared/src/main/scala/syntax.scala @@ -16,6 +16,8 @@ package fetch +import cats.RecursiveTailRecM + object syntax { /** Implicit syntax to lift any value to the context of Fetch via pure */ @@ -26,7 +28,7 @@ object syntax { } /** Implicit syntax to lift exception to Fetch errors */ - implicit class FetchErrorSyntax(val a: Throwable) extends AnyVal { + implicit class FetchExceptionSyntax(val a: Throwable) extends AnyVal { def fetch[B]: Fetch[B] = Fetch.error[B](a) @@ -38,22 +40,22 @@ object syntax { def join[B](fb: Fetch[B]): Fetch[(A, B)] = Fetch.join(fa, fb) - def runF[M[_]: FetchMonadError]: M[(FetchEnv, A)] = + def runF[M[_]: FetchMonadError: RecursiveTailRecM]: M[(FetchEnv, A)] = Fetch.runFetch[M](fa, InMemoryCache.empty) - def runE[M[_]: FetchMonadError]: M[FetchEnv] = + def runE[M[_]: FetchMonadError: RecursiveTailRecM]: M[FetchEnv] = Fetch.runEnv[M](fa, InMemoryCache.empty) - def runA[M[_]: FetchMonadError]: M[A] = + def runA[M[_]: FetchMonadError: RecursiveTailRecM]: M[A] = Fetch.run[M](fa, InMemoryCache.empty) - def runF[M[_]: FetchMonadError](cache: DataSourceCache): M[(FetchEnv, A)] = + def runF[M[_]: FetchMonadError: RecursiveTailRecM](cache: DataSourceCache): M[(FetchEnv, A)] = Fetch.runFetch[M](fa, cache) - def runE[M[_]: FetchMonadError](cache: DataSourceCache): M[FetchEnv] = + def runE[M[_]: FetchMonadError: RecursiveTailRecM](cache: DataSourceCache): M[FetchEnv] = Fetch.runEnv[M](fa, cache) - def runA[M[_]: FetchMonadError](cache: DataSourceCache): M[A] = + def runA[M[_]: FetchMonadError: RecursiveTailRecM](cache: DataSourceCache): M[A] = Fetch.run[M](fa, cache) } } diff --git a/shared/src/test/scala/FetchAsyncQueryTests.scala b/shared/src/test/scala/FetchAsyncQueryTests.scala index 17877bf8..3e3dac42 100644 --- a/shared/src/test/scala/FetchAsyncQueryTests.scala +++ b/shared/src/test/scala/FetchAsyncQueryTests.scala @@ -20,13 +20,12 @@ import scala.concurrent.duration._ import org.scalatest._ import cats.data.NonEmptyList -import cats.std.list._ +import cats.instances.list._ import fetch._ import fetch.implicits._ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers { - implicit def executionContext = ExecutionContext.Implicits.global - override def newInstance = new FetchAsyncQueryTests + implicit override def executionContext = ExecutionContext.Implicits.global case class ArticleId(id: Int) case class Article(id: Int, content: String) { diff --git a/shared/src/test/scala/FetchTests.scala b/shared/src/test/scala/FetchTests.scala index 51212a70..afe42978 100644 --- a/shared/src/test/scala/FetchTests.scala +++ b/shared/src/test/scala/FetchTests.scala @@ -21,7 +21,7 @@ import org.scalatest._ import cats.{MonadError} import cats.data.{NonEmptyList, Xor} -import cats.std.list._ +import cats.instances.list._ import fetch._ import fetch.implicits._ @@ -29,7 +29,7 @@ import fetch.implicits._ object TestHelper { import fetch.syntax._ - case class NotFound() extends Throwable + case class DidNotFound() extends Throwable case class One(id: Int) implicit object OneSource extends DataSource[One, Int] { @@ -38,7 +38,7 @@ object TestHelper { Query.sync(Option(id.id)) } override def fetchMany(ids: NonEmptyList[One]): Query[Map[One, Int]] = - Query.sync(ids.unwrap.map(one => (one, one.id)).toMap) + Query.sync(ids.toList.map(one => (one, one.id)).toMap) } def one(id: Int): Fetch[Int] = Fetch(One(id)) @@ -48,7 +48,7 @@ object TestHelper { override def fetchOne(id: AnotherOne): Query[Option[Int]] = Query.sync(Option(id.id)) override def fetchMany(ids: NonEmptyList[AnotherOne]): Query[Map[AnotherOne, Int]] = - Query.sync(ids.unwrap.map(anotherone => (anotherone, anotherone.id)).toMap) + Query.sync(ids.toList.map(anotherone => (anotherone, anotherone.id)).toMap) } def anotherOne(id: Int): Fetch[Int] = Fetch(AnotherOne(id)) @@ -58,8 +58,9 @@ object TestHelper { override def fetchOne(id: Many): Query[Option[List[Int]]] = Query.sync(Option(0 until id.n toList)) override def fetchMany(ids: NonEmptyList[Many]): Query[Map[Many, List[Int]]] = - Query.sync(ids.unwrap.map(m => (m, 0 until m.n toList)).toMap) + Query.sync(ids.toList.map(m => (m, 0 until m.n toList)).toMap) } + def many(id: Int): Fetch[List[Int]] = Fetch(Many(id)) case class Never() implicit object NeverSource extends DataSource[Never, Int] { @@ -69,34 +70,30 @@ object TestHelper { override def fetchMany(ids: NonEmptyList[Never]): Query[Map[Never, Int]] = Query.sync(Map.empty[Never, Int]) } - def many(id: Int): Fetch[List[Int]] = Fetch(Many(id)) + + def requestFetches(r: FetchRequest): Int = + r match { + case FetchOne(_, _) => 1 + case FetchMany(ids, _) => ids.toList.size + case Concurrent(requests) => requests.map(requestFetches).sum + } def totalFetched(rs: Seq[Round]): Int = - rs.filterNot(_.cached) - .foldLeft(0)((acc, round) => - round.kind match { - case OneRound(_) => acc + 1 - case ManyRound(ids) => acc + ids.size - case ConcurrentRound(ids) => acc + ids.map(_._2.size).sum - }) + rs.map((round: Round) => requestFetches(round.request)).sum + + def requestBatches(r: FetchRequest): Int = + r match { + case FetchOne(_, _) => 0 + case FetchMany(ids, _) => 1 + case Concurrent(requests) => + requests.count { + case FetchMany(_, _) => true + case _ => false + } + } def totalBatches(rs: Seq[Round]): Int = - rs.filterNot(_.cached) - .foldLeft(0)((acc, round) => - round.kind match { - case OneRound(_) => acc - case ManyRound(ids) => acc + 1 - case ConcurrentRound(ids) => acc + ids.filter(_._2.size > 1).size - }) - - def concurrent(rs: Seq[Round]): Seq[Round] = - rs.filter( - r => - r.kind match { - case ConcurrentRound(_) => true - case other => false - } - ) + rs.map((round: Round) => requestBatches(round.request)).sum } class FetchSyntaxTests extends AsyncFreeSpec with Matchers { @@ -105,8 +102,7 @@ class FetchSyntaxTests extends AsyncFreeSpec with Matchers { val ME = implicitly[FetchMonadError[Future]] - implicit def executionContext = ExecutionContext.Implicits.global - override def newInstance = new FetchSyntaxTests + implicit override def executionContext = ExecutionContext.Implicits.global "Cartesian syntax is implicitly concurrent" in { import cats.syntax.cartesian._ @@ -116,7 +112,7 @@ class FetchSyntaxTests extends AsyncFreeSpec with Matchers { val fut = Fetch.runEnv[Future](fetch) fut.map(env => { - concurrent(env.rounds).size shouldEqual 1 + env.rounds.size shouldEqual 1 }) } @@ -130,9 +126,10 @@ class FetchSyntaxTests extends AsyncFreeSpec with Matchers { fut.map( env => { val rounds = env.rounds - val stats = (concurrent(rounds).size, totalBatches(rounds), totalFetched(rounds)) - stats shouldEqual (1, 1, 2) + rounds.size shouldEqual 1 + totalBatches(rounds) shouldEqual 1 + totalFetched(rounds) shouldEqual 2 }) } @@ -189,8 +186,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { val ME = implicitly[FetchMonadError[Future]] - implicit def executionContext = ExecutionContext.Implicits.global - override def newInstance = new FetchTests + implicit override def executionContext = ExecutionContext.Implicits.global "We can lift plain values to Fetch" in { val fetch: Fetch[Int] = Fetch.pure(42) @@ -203,14 +199,8 @@ class FetchTests extends AsyncFreeSpec with Matchers { ME.attempt(fut) .map(xor => - xor match { - case Xor.Left(FetchFailure(env)) => { - env.rounds.headOption match { - case Some(Round(_, _, OneRound(Never()), _, _, _)) => assert(true) - case _ => fail("Should've thrown a fetch failure") - } - } - case _ => fail("Should've thrown a fetch failure") + xor should matchPattern { + case Xor.Left(NotFound(env, FetchOne(Never(), _))) => }) } @@ -229,8 +219,8 @@ class FetchTests extends AsyncFreeSpec with Matchers { ME.attempt(Fetch.run[Future](fetch, cache)) .map(xor => xor match { - case Xor.Left(FetchFailure(env)) => env.cache shouldEqual cache - case _ => fail("Cache should be populated") + case Xor.Left(NotFound(env, _)) => env.cache shouldEqual cache + case _ => fail("Cache should be populated") }) } @@ -243,20 +233,20 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can lift errors to Fetch" in { - val fetch: Fetch[Int] = Fetch.error(NotFound()) + val fetch: Fetch[Int] = Fetch.error(DidNotFound()) ME.attempt(Fetch.run[Future](fetch)) .map(xor => xor match { - case Xor.Left(NotFound()) => assert(true) - case _ => fail("Should've thrown NotFound exception") + case Xor.Left(UnhandledException(DidNotFound())) => assert(true) + case _ => fail("Should've thrown NotFound exception") }) } "We can lift handle and recover from errors in Fetch" in { import cats.syntax.applicativeError._ - val fetch: Fetch[Int] = Fetch.error(NotFound()) + val fetch: Fetch[Int] = Fetch.error(DidNotFound()) val fut = Fetch.run[Future](fetch) ME.handleErrorWith(fut)(err => Future.successful(42)).map(_ shouldEqual 42) } @@ -316,7 +306,6 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can traverse over a list with a Fetch for each element" in { - import cats.std.list._ import cats.syntax.traverse._ val fetch: Fetch[List[Int]] = for { @@ -328,8 +317,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { fut.map(_ shouldEqual List(0, 1, 2)) } - "Traversals are implicitly concurrent" in { - import cats.std.list._ + "Traversals are implicitly batched" in { import cats.syntax.traverse._ val fetch: Fetch[List[Int]] = for { @@ -340,7 +328,25 @@ class FetchTests extends AsyncFreeSpec with Matchers { Fetch .runEnv[Future](fetch) .map(env => { - concurrent(env.rounds).size shouldEqual 1 + env.rounds.size shouldEqual 2 + }) + } + + "Identities are deduped when batched" in { + import cats.syntax.traverse._ + + val manies = List(1, 1, 2) + val fetch: Fetch[List[Int]] = for { + ones <- manies.traverse(one) + } yield ones + + Fetch + .runEnv[Future](fetch) + .map(env => { + env.rounds.size shouldEqual 1 + env.rounds.head.request should matchPattern { + case Concurrent(FetchMany(NonEmptyList(One(1), List(One(2))), source) :: Nil) => + } }) } @@ -350,7 +356,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { Fetch .runEnv[Future](fetch) .map(env => { - concurrent(env.rounds).size shouldEqual 1 + env.rounds.size shouldEqual 1 }) } @@ -365,26 +371,26 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If a fetch fails in the left hand of a product the product will fail" in { - val fetch: Fetch[(Int, List[Int])] = Fetch.join(Fetch.error(NotFound()), many(3)) + val fetch: Fetch[(Int, List[Int])] = Fetch.join(Fetch.error(DidNotFound()), many(3)) val fut = Fetch.run[Future](fetch) ME.attempt(Fetch.run[Future](fetch)) .map(xor => xor match { - case Xor.Left(NotFound()) => assert(true) - case _ => fail("Should've thrown NotFound exception") + case Xor.Left(UnhandledException(DidNotFound())) => assert(true) + case _ => fail("Should've thrown NotFound exception") }) } "If a fetch fails in the right hand of a product the product will fail" in { - val fetch: Fetch[(List[Int], Int)] = Fetch.join(many(3), Fetch.error(NotFound())) + val fetch: Fetch[(List[Int], Int)] = Fetch.join(many(3), Fetch.error(DidNotFound())) val fut = Fetch.run[Future](fetch) ME.attempt(Fetch.run[Future](fetch)) .map(xor => xor match { - case Xor.Left(NotFound()) => assert(true) - case _ => fail("Should've thrown NotFound exception") + case Xor.Left(UnhandledException(DidNotFound())) => assert(true) + case _ => fail("Should've thrown NotFound exception") }) } @@ -393,10 +399,12 @@ class FetchTests extends AsyncFreeSpec with Matchers { val fut = Fetch.run[Future](fetch) ME.attempt(Fetch.run[Future](fetch)) - .map(xor => - xor match { - case Xor.Left(FetchFailure(_)) => assert(true) - case _ => fail("Should've thrown a fetch failure") + .map(xor => { + xor match { + case Xor.Left(MissingIdentities(_, missing)) => + missing shouldEqual Map(NeverSource.name -> List(Never())) + case _ => fail("Should've thrown a fetch failure") + } }) } @@ -405,10 +413,12 @@ class FetchTests extends AsyncFreeSpec with Matchers { val fut = Fetch.run[Future](fetch) ME.attempt(fut) - .map(xor => - xor match { - case Xor.Left(FetchFailure(_)) => assert(true) - case _ => fail("Should've thrown a fetch failure") + .map(xor => { + xor match { + case Xor.Left(MissingIdentities(_, missing)) => + missing shouldEqual Map(NeverSource.name -> List(Never())) + case _ => fail("Should've thrown a fetch failure") + } }) } @@ -425,7 +435,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { .runEnv[Future](fetch) .map(env => { val rounds = env.rounds - val stats = (concurrent(rounds).size, totalBatches(rounds), totalFetched(rounds)) + val stats = (rounds.size, totalBatches(rounds), totalFetched(rounds)) stats shouldEqual (1, 1, 4) }) @@ -452,7 +462,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { .runEnv[Future](fetch) .map(env => { val rounds = env.rounds - val stats = (concurrent(rounds).size, totalBatches(rounds), totalFetched(rounds)) + val stats = (rounds.size, totalBatches(rounds), totalFetched(rounds)) stats shouldEqual (2, 1, 4) }) @@ -476,13 +486,13 @@ class FetchTests extends AsyncFreeSpec with Matchers { .runEnv[Future](fetch) .map(env => { val rounds = env.rounds - val stats = (concurrent(rounds).size, totalBatches(rounds), totalFetched(rounds)) + val stats = (rounds.size, totalBatches(rounds), totalFetched(rounds)) stats shouldEqual (3, 3, 6) }) } - "Every level of sequenced concurrent of concurrent fetches is batched" in { + "Every level of sequenced concurrent fetches is batched" in { val fetch = Fetch.join( Fetch.join( for { @@ -503,7 +513,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { .runEnv[Future](fetch) .map(env => { val rounds = env.rounds - val stats = (concurrent(rounds).size, totalBatches(rounds), totalFetched(rounds)) + val stats = (rounds.size, totalBatches(rounds), totalFetched(rounds)) stats shouldEqual (3, 3, 9 + 4 + 6) }) @@ -516,9 +526,9 @@ class FetchTests extends AsyncFreeSpec with Matchers { .runEnv[Future](fetch) .map(env => { val rounds = env.rounds - val stats = (concurrent(rounds).size, totalBatches(rounds)) - stats shouldEqual (1, 1) + rounds.size shouldEqual 1 + totalBatches(rounds) shouldEqual 1 }) } @@ -553,9 +563,9 @@ class FetchTests extends AsyncFreeSpec with Matchers { .runEnv[Future](fetch) .map(env => { val rounds = env.rounds - val stats = (concurrent(rounds).size, totalBatches(rounds)) - stats shouldEqual (1, 2) + rounds.size shouldEqual 1 + totalBatches(rounds) shouldEqual 2 }) } @@ -567,9 +577,9 @@ class FetchTests extends AsyncFreeSpec with Matchers { .runEnv[Future](fetch) .map(env => { val rounds = env.rounds - val stats = (concurrent(rounds).size, totalFetched(rounds)) - stats shouldEqual (1, 2) + rounds.size shouldEqual 1 + totalFetched(rounds) shouldEqual 2 }) } @@ -588,9 +598,9 @@ class FetchTests extends AsyncFreeSpec with Matchers { fut.map( env => { val rounds = env.rounds - val stats = (concurrent(rounds).size, totalFetched(rounds)) - stats shouldEqual (1, 2) + rounds.size shouldEqual 1 + totalFetched(rounds) shouldEqual 2 }) } @@ -600,13 +610,13 @@ class FetchTests extends AsyncFreeSpec with Matchers { Fetch.run[Future](fetch).map(_ shouldEqual List(1, 2, 3)) } - "Traversals are run concurrently" in { + "Traversals are batched" in { val fetch = Fetch.traverse(List(1, 2, 3))(one) Fetch .runEnv[Future](fetch) .map(env => { - concurrent(env.rounds).size shouldEqual 1 + env.rounds.size shouldEqual 1 }) } @@ -617,9 +627,9 @@ class FetchTests extends AsyncFreeSpec with Matchers { .runEnv[Future](fetch) .map(env => { val rounds = env.rounds - val stats = (concurrent(rounds).size, totalFetched(rounds)) - stats shouldEqual (1, 2) + rounds.size shouldEqual 1 + totalFetched(rounds) shouldEqual 2 }) } @@ -633,9 +643,9 @@ class FetchTests extends AsyncFreeSpec with Matchers { .runEnv[Future](fetch) .map(env => { val rounds = env.rounds - val stats = (concurrent(rounds).size, totalFetched(rounds)) - stats shouldEqual (1, 2) + rounds.size shouldEqual 1 + totalFetched(rounds) shouldEqual 2 }) } @@ -685,7 +695,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { env => { val rounds = env.rounds - totalFetched(rounds) shouldEqual 0 + rounds.size shouldEqual 0 }) } @@ -716,7 +726,6 @@ class FetchTests extends AsyncFreeSpec with Matchers { _ <- Fetch.traverse(List(1, 2, 3))(one) _ <- one(1) } yield aOne + anotherOne - val fut = Fetch.runEnv[Future]( fetch, InMemoryCache( @@ -731,7 +740,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { env => { val rounds = env.rounds - totalFetched(rounds) shouldEqual 0 + rounds.size shouldEqual 0 }) } @@ -777,3 +786,118 @@ class FetchTests extends AsyncFreeSpec with Matchers { }) } } + +class FetchReportingTests extends AsyncFreeSpec with Matchers { + import TestHelper._ + + val ME = implicitly[FetchMonadError[Future]] + + implicit override def executionContext = ExecutionContext.Implicits.global + + "Plain values have no rounds of execution" in { + val fetch: Fetch[Int] = Fetch.pure(42) + Fetch.runEnv[Future](fetch).map(_.rounds.size shouldEqual 0) + } + + "Single fetches are executed in one round" in { + val fetch = one(1) + Fetch.runEnv[Future](fetch).map(_.rounds.size shouldEqual 1) + } + + "Single fetches are executed in one round per binding in a for comprehension" in { + val fetch = for { + o <- one(1) + t <- one(2) + } yield (o, t) + + Fetch.runEnv[Future](fetch).map(_.rounds.size shouldEqual 2) + } + + "Single fetches for different data sources are executed in multiple rounds if they are in a for comprehension" in { + val fetch: Fetch[(Int, List[Int])] = for { + o <- one(1) + m <- many(3) + } yield (o, m) + + Fetch.runEnv[Future](fetch).map(_.rounds.size shouldEqual 2) + } + + "Single fetches combined with cartesian are run in one round" in { + import cats.syntax.cartesian._ + + val fetch: Fetch[(Int, List[Int])] = (one(1) |@| many(3)).tupled + val fut = Fetch.runEnv[Future](fetch) + + fut.map(_.rounds.size shouldEqual 1) + } + + "Single fetches combined with traverse are run in one round" in { + import cats.syntax.traverse._ + + val fetch: Fetch[List[Int]] = for { + manies <- many(3) + ones <- manies.traverse(one) + } yield ones + + val fut = Fetch.runEnv[Future](fetch) + fut.map(_.rounds.size shouldEqual 2) + } + + "The product of two fetches from the same data source implies batching" in { + val fetch: Fetch[(Int, Int)] = Fetch.join(one(1), one(3)) + + Fetch + .runEnv[Future](fetch) + .map(env => { + env.rounds.size shouldEqual 1 + }) + } + + "The product of concurrent fetches of the same type implies everything fetched in batches" in { + val fetch = Fetch.join( + Fetch.join( + for { + a <- one(1) + b <- one(2) + c <- one(3) + } yield c, + for { + a <- one(2) + m <- many(4) + c <- one(3) + } yield c + ), + one(3) + ) + + Fetch + .runEnv[Future](fetch) + .map(env => { + env.rounds.size shouldEqual 2 + }) + } + + "Every level of sequenced concurrent of concurrent fetches is batched" in { + val fetch = Fetch.join( + Fetch.join( + for { + a <- Fetch.sequence(List(one(2), one(3), one(4))) + b <- Fetch.sequence(List(many(0), many(1))) + c <- Fetch.sequence(List(one(9), one(10), one(11))) + } yield c, + for { + a <- Fetch.sequence(List(one(5), one(6), one(7))) + b <- Fetch.sequence(List(many(2), many(3))) + c <- Fetch.sequence(List(one(12), one(13), one(14))) + } yield c + ), + Fetch.sequence(List(one(15), one(16), one(17))) + ) + + Fetch + .runEnv[Future](fetch) + .map(env => { + env.rounds.size shouldEqual 3 + }) + } +} diff --git a/tut/README.md b/tut/README.md index 71071aa2..ef4dc6db 100644 --- a/tut/README.md +++ b/tut/README.md @@ -45,10 +45,8 @@ To tell `Fetch` how to get the data you want, you must implement the `DataSource Data Sources take two type parameters: -
    -
  1. Identity is a type that has enough information to fetch the data. For a users data source, this would be a user's unique ID.
  2. -
  3. Result is the type of data we want to fetch. For a users data source, this would the `User` type.
  4. -
+1. `Identity` is a type that has enough information to fetch the data. For a users data source, this would be a user's unique ID. +2. `Result` is the type of data we want to fetch. For a users data source, this would the `User` type. ```scala import cats.data.NonEmptyList @@ -63,7 +61,7 @@ We'll implement a dummy data source that can convert integers to strings. For co ```tut:silent import cats.data.NonEmptyList -import cats.std.list._ +import cats.instances.list._ import fetch._ implicit object ToStringSource extends DataSource[Int, String]{ @@ -76,7 +74,7 @@ implicit object ToStringSource extends DataSource[Int, String]{ override def fetchMany(ids: NonEmptyList[Int]): Query[Map[Int, String]] = { Query.sync({ println(s"[${Thread.currentThread.getId}] Many ToString $ids") - ids.unwrap.map(i => (i, i.toString)).toMap + ids.toList.map(i => (i, i.toString)).toMap }) } } @@ -141,7 +139,7 @@ implicit object LengthSource extends DataSource[String, Int]{ override def fetchMany(ids: NonEmptyList[String]): Query[Map[String, Int]] = { Query.async((ok, fail) => { println(s"[${Thread.currentThread.getId}] Many Length $ids") - ok(ids.unwrap.map(i => (i, i.size)).toMap) + ok(ids.toList.map(i => (i, i.size)).toMap) }) } }