diff --git a/docs/src/main/tut/docs.md b/docs/src/main/tut/docs.md index d4f9de20..b679945a 100644 --- a/docs/src/main/tut/docs.md +++ b/docs/src/main/tut/docs.md @@ -481,7 +481,7 @@ We'll be using the default in-memory cache, prepopulated with some data. The cac is calculated with the `DataSource`'s `name` method and the request identity. ```tut:silent -val cache = InMemoryCache.from( +def cache[F[_] : ConcurrentEffect : Par] = InMemoryCache.from[F, Int, User]( (UserSource.name, 1) -> User(1, "@dialelo") ) ``` @@ -526,32 +526,37 @@ The default cache is implemented as an immutable in-memory map, but users are fr There is no need for the cache to be mutable since fetch executions run in an interpreter that uses the state monad. Note that the `update` method in the `DataSourceCache` trait yields a new, updated cache. ```scala -trait DataSourceCache { - def insert[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A], v: A): DataSourceIdentity, v: A): F[DataSourceCache] - def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] +trait DataSourceCache[F[_]] { + def insert[I, A](i: I, ds: DataSource[I, A], v: A): DataSourceIdentity, v: A): F[DataSourceCache[F]] + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] } ``` Let's implement a cache that forgets everything we store in it. ```tut:silent -import cats.Applicative +import cats.{Applicative, Monad} -final case class ForgetfulCache() extends DataSourceCache { - def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache] = Applicative[F].pure(this) - def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = Applicative[F].pure(None) +case class ForgetfulCache[F[_] : Monad]() extends DataSourceCache[F] { + def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = + Applicative[F].pure(this) + + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = + Applicative[F].pure(None) } + +def forgetfulCache[F[_] : ConcurrentEffect : Par] = ForgetfulCache[F]() ``` We can now use our implementation of the cache when running a fetch. ```tut:book -def fetchSameTwice[F[_] : ConcurrentEffect]: Fetch[F, (User, User)] = for { +def fetchSameTwice[F[_] : ConcurrentEffect : Par]: Fetch[F, (User, User)] = for { one <- getUser(1) another <- getUser(1) } yield (one, another) -Fetch.run[IO](fetchSameTwice, ForgetfulCache()).unsafeRunTimed(5.seconds) +Fetch.run[IO](fetchSameTwice, forgetfulCache).unsafeRunTimed(5.seconds) ``` # Batching diff --git a/shared/src/main/scala/cache.scala b/shared/src/main/scala/cache.scala index 95c524f0..ae1916c2 100644 --- a/shared/src/main/scala/cache.scala +++ b/shared/src/main/scala/cache.scala @@ -17,9 +17,11 @@ package fetch import cats._ +import cats.effect._ +import cats.data.NonEmptyList import cats.instances.list._ import cats.syntax.all._ -import cats.effect._ +import cats.temp.par._ final class DataSourceName(val name: String) extends AnyVal final class DataSourceId(val id: Any) extends AnyVal @@ -28,42 +30,39 @@ final class DataSourceResult(val result: Any) extends AnyVal /** * A `Cache` trait so the users of the library can provide their own cache. */ -trait DataSourceCache { - def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] - def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache] - def insertMany[F[_]: ConcurrentEffect, I, A](vs: Map[I, A], ds: DataSource[I, A]): F[DataSourceCache] = - vs.toList.foldLeftM(this)({ - case (c, (i, v)) => c.insert(i, v, ds) - }) +trait DataSourceCache[F[_]] { + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] + + def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] + + // def delete[I, A](i: I, v: A, ds: DataSource[I, A]): F[Unit] + + def bulkInsert[I, A](vs: List[(I, A)], ds: DataSource[I, A])( + implicit M: Monad[F] + ): F[DataSourceCache[F]] = { + vs.foldLeftM(this){ + case (acc, (i, v)) => + acc.insert(i, v, ds) + } + } } /** * A cache that stores its elements in memory. */ -case class InMemoryCache(state: Map[(DataSourceName, DataSourceId), DataSourceResult]) extends DataSourceCache { - def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = +case class InMemoryCache[F[_] : Monad](state: Map[(DataSourceName, DataSourceId), DataSourceResult]) extends DataSourceCache[F] { + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = Applicative[F].pure(state.get((new DataSourceName(ds.name), new DataSourceId(i))).map(_.result.asInstanceOf[A])) - def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache] = + def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = Applicative[F].pure(copy(state = state.updated((new DataSourceName(ds.name), new DataSourceId(i)), new DataSourceResult(v)))) } object InMemoryCache { - def empty: InMemoryCache = InMemoryCache(Map.empty[(DataSourceName, DataSourceId), DataSourceResult]) + def empty[F[_] : Monad]: InMemoryCache[F] = InMemoryCache[F](Map.empty[(DataSourceName, DataSourceId), DataSourceResult]) - def from[I, A](results: ((String, I), A)*): InMemoryCache = - InMemoryCache(results.foldLeft(Map.empty[(DataSourceName, DataSourceId), DataSourceResult])({ + def from[F[_]: Monad, I, A](results: ((String, I), A)*): InMemoryCache[F] = + InMemoryCache[F](results.foldLeft(Map.empty[(DataSourceName, DataSourceId), DataSourceResult])({ case (acc, ((s, i), v)) => acc.updated((new DataSourceName(s), new DataSourceId(i)), new DataSourceResult(v)) })) - - implicit val inMemoryCacheMonoid: Monoid[InMemoryCache] = { - implicit val anySemigroup = new Semigroup[Any] { - def combine(a: Any, b: Any): Any = b - } - new Monoid[InMemoryCache] { - def empty: InMemoryCache = InMemoryCache.empty - def combine(c1: InMemoryCache, c2: InMemoryCache): InMemoryCache = - InMemoryCache(c1.state ++ c2.state) - } - } } diff --git a/shared/src/main/scala/fetch.scala b/shared/src/main/scala/fetch.scala index 875be580..39a5afed 100644 --- a/shared/src/main/scala/fetch.scala +++ b/shared/src/main/scala/fetch.scala @@ -268,9 +268,20 @@ object `package` { def run[F[_]]: FetchRunner[F] = new FetchRunner[F] private[fetch] class FetchRunner[F[_]](private val dummy: Boolean = true) extends AnyVal { + def apply[A]( + fa: Fetch[F, A] + )( + implicit + P: Par[F], + C: ConcurrentEffect[F], + CS: ContextShift[F], + T: Timer[F] + ): F[A] = + apply(fa, InMemoryCache.empty[F]) + def apply[A]( fa: Fetch[F, A], - cache: DataSourceCache = InMemoryCache.empty + cache: DataSourceCache[F] )( implicit P: Par[F], @@ -278,7 +289,7 @@ object `package` { CS: ContextShift[F], T: Timer[F] ): F[A] = for { - cache <- Ref.of[F, DataSourceCache](cache) + cache <- Ref.of[F, DataSourceCache[F]](cache) result <- performRun(fa, cache, None) } yield result } @@ -289,9 +300,20 @@ object `package` { def runEnv[F[_]]: FetchRunnerEnv[F] = new FetchRunnerEnv[F] private[fetch] class FetchRunnerEnv[F[_]](private val dummy: Boolean = true) extends AnyVal { + def apply[A]( + fa: Fetch[F, A] + )( + implicit + P: Par[F], + C: ConcurrentEffect[F], + CS: ContextShift[F], + T: Timer[F] + ): F[(Env, A)] = + apply(fa, InMemoryCache.empty[F]) + def apply[A]( fa: Fetch[F, A], - cache: DataSourceCache = InMemoryCache.empty + cache: DataSourceCache[F] )( implicit P: Par[F], @@ -300,7 +322,7 @@ object `package` { T: Timer[F] ): F[(Env, A)] = for { env <- Ref.of[F, Env](FetchEnv()) - cache <- Ref.of[F, DataSourceCache](cache) + cache <- Ref.of[F, DataSourceCache[F]](cache) result <- performRun(fa, cache, Some(env)) e <- env.get } yield (e, result) @@ -312,17 +334,28 @@ object `package` { def runCache[F[_]]: FetchRunnerCache[F] = new FetchRunnerCache[F] private[fetch] class FetchRunnerCache[F[_]](private val dummy: Boolean = true) extends AnyVal { + def apply[A]( + fa: Fetch[F, A] + )( + implicit + P: Par[F], + C: ConcurrentEffect[F], + CS: ContextShift[F], + T: Timer[F] + ): F[(DataSourceCache[F], A)] = + apply(fa, InMemoryCache.empty[F]) + def apply[A]( fa: Fetch[F, A], - cache: DataSourceCache = InMemoryCache.empty + cache: DataSourceCache[F] )( implicit P: Par[F], C: ConcurrentEffect[F], CS: ContextShift[F], T: Timer[F] - ): F[(DataSourceCache, A)] = for { - cache <- Ref.of[F, DataSourceCache](cache) + ): F[(DataSourceCache[F], A)] = for { + cache <- Ref.of[F, DataSourceCache[F]](cache) result <- performRun(fa, cache, None) c <- cache.get } yield (c, result) @@ -332,7 +365,7 @@ object `package` { private def performRun[F[_], A]( fa: Fetch[F, A], - cache: Ref[F, DataSourceCache], + cache: Ref[F, DataSourceCache[F]], env: Option[Ref[F, Env]] )( implicit @@ -360,7 +393,7 @@ object `package` { private def fetchRound[F[_], A]( rs: RequestMap[F], - cache: Ref[F, DataSourceCache], + cache: Ref[F, DataSourceCache[F]], env: Option[Ref[F, Env]] )( implicit @@ -387,7 +420,7 @@ object `package` { private def runBlockedRequest[F[_], A]( blocked: BlockedRequest[F], - cache: Ref[F, DataSourceCache], + cache: Ref[F, DataSourceCache[F]], env: Option[Ref[F, Env]] )( implicit @@ -405,7 +438,7 @@ object `package` { private def runFetchOne[F[_]]( q: FetchOne[Any, Any], putResult: FetchStatus => F[Unit], - cache: Ref[F, DataSourceCache], + cache: Ref[F, DataSourceCache[F]], env: Option[Ref[F, Env]] )( implicit @@ -450,7 +483,7 @@ object `package` { private def runBatch[F[_]]( q: Batch[Any, Any], putResult: FetchStatus => F[Unit], - cache: Ref[F, DataSourceCache], + cache: Ref[F, DataSourceCache[F]], env: Option[Ref[F, Env]] )( implicit @@ -493,7 +526,7 @@ object `package` { endTime <- T.clock.monotonic(MILLISECONDS) resultMap = combineBatchResults(batchedRequest.results, cachedResults) - updatedCache <- c.insertMany(batchedRequest.results, request.ds) + updatedCache <- c.bulkInsert(batchedRequest.results.toList, request.ds) _ <- cache.set(updatedCache) result <- putResult(FetchDone[Map[Any, Any]](resultMap)) diff --git a/shared/src/test/scala/FetchAsyncQueryTests.scala b/shared/src/test/scala/FetchAsyncQueryTests.scala index b8d6a866..5678c312 100644 --- a/shared/src/test/scala/FetchAsyncQueryTests.scala +++ b/shared/src/test/scala/FetchAsyncQueryTests.scala @@ -33,7 +33,7 @@ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers { implicit val cs: ContextShift[IO] = IO.contextShift(executionContext) "We can interpret an async fetch into an IO" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, Article] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Article] = article(1) val io = Fetch.run[IO](fetch) @@ -42,7 +42,7 @@ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers { } "We can combine several async data sources and interpret a fetch into an IO" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Article, Author)] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Article, Author)] = for { art <- article(1) author <- author(art) } yield (art, author) @@ -53,7 +53,7 @@ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers { } "We can use combinators in a for comprehension and interpret a fetch from async sources into an IO" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Article]] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Article]] = for { articles <- List(1, 1, 2).traverse(article[F]) } yield articles @@ -67,7 +67,7 @@ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers { } "We can use combinators and multiple sources in a for comprehension and interpret a fetch from async sources into an IO" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { articles <- List(1, 1, 2).traverse(article[F]) authors <- articles.traverse(author[F]) } yield (articles, authors) @@ -108,7 +108,7 @@ object DataSources { }) } - def article[F[_] : ConcurrentEffect](id: Int): Fetch[F, Article] = + def article[F[_] : ConcurrentEffect : Par](id: Int): Fetch[F, Article] = Fetch(ArticleId(id), ArticleAsync) case class AuthorId(id: Int) @@ -127,6 +127,6 @@ object DataSources { })) } - def author[F[_] : ConcurrentEffect](a: Article): Fetch[F, Author] = + def author[F[_] : ConcurrentEffect : Par](a: Article): Fetch[F, Author] = Fetch(AuthorId(a.author), AuthorAsync) } diff --git a/shared/src/test/scala/FetchReportingTests.scala b/shared/src/test/scala/FetchReportingTests.scala index 8a593e34..1a829d15 100644 --- a/shared/src/test/scala/FetchReportingTests.scala +++ b/shared/src/test/scala/FetchReportingTests.scala @@ -22,6 +22,7 @@ import org.scalatest.{AsyncFreeSpec, Matchers} import fetch._ +import cats.temp.par._ import cats.effect._ import cats.instances.list._ import cats.syntax.all._ @@ -34,7 +35,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { implicit val cs: ContextShift[IO] = IO.contextShift(executionContext) "Plain values have no rounds of execution" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = Fetch.pure[F, Int](42) val io = Fetch.runEnv[IO](fetch) @@ -45,7 +46,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "Single fetches are executed in one round" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = one(1) val io = Fetch.runEnv[IO](fetch) @@ -56,7 +57,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "Single fetches are executed in one round per binding in a for comprehension" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { o <- one(1) t <- one(2) } yield (o, t) @@ -69,7 +70,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "Single fetches for different data sources are executed in multiple rounds if they are in a for comprehension" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { o <- one(1) m <- many(3) } yield (o, m) @@ -82,7 +83,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "Single fetches combined with cartesian are run in one round" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = (one(1), many(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -93,7 +94,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "Single fetches combined with traverse are run in one round" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { manies <- many(3) // round 1 ones <- manies.traverse(one[F]) // round 2 } yield ones @@ -106,7 +107,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "The product of two fetches from the same data source implies batching" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = (one(1), one(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -117,25 +118,28 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "The product of concurrent fetches of the same type implies everything fetched in batches" in { - def aFetch[F[_] : ConcurrentEffect] = for { - a <- one(1) // round 1 - b <- one(2) // round 2 - c <- one(3) + def aFetch[F[_] : ConcurrentEffect : Par] = for { + a <- one(1) // round 1 (batched) + b <- one(2) // round 2 (cached) + c <- one(3) // round 3 (deduplicated) } yield c - def anotherFetch[F[_] : ConcurrentEffect] = for { - a <- one(2) // round 1 + def anotherFetch[F[_] : ConcurrentEffect : Par] = for { + a <- one(2) // round 1 (batched) m <- many(4) // round 2 - c <- one(3) + c <- one(3) // round 3 (deduplicated) } yield c - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = ((aFetch, anotherFetch).tupled, one(3)).tupled val io = Fetch.runEnv[IO](fetch) io.map({ - case (env, result) => env.rounds.size shouldEqual 2 + case (env, result) => + env.rounds.size shouldEqual 2 + totalBatches(env.rounds) shouldEqual 1 + totalFetched(env.rounds) shouldEqual 3 + 1 }).unsafeToFuture } } diff --git a/shared/src/test/scala/FetchTests.scala b/shared/src/test/scala/FetchTests.scala index b1c6cb00..095e1030 100644 --- a/shared/src/test/scala/FetchTests.scala +++ b/shared/src/test/scala/FetchTests.scala @@ -39,7 +39,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { // Fetch ops "We can lift plain values to Fetch" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = Fetch.pure[F, Int](42) Fetch.run[IO](fetch).map(_ shouldEqual 42).unsafeToFuture @@ -50,14 +50,14 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can map over Fetch values" in { - def fetch[F[_] : ConcurrentEffect : ContextShift]: Fetch[F, (Int)] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int)] = one(1).map(_ + 1) Fetch.run[IO](fetch).map(_ shouldEqual 2).unsafeToFuture } "We can use fetch inside a for comprehension" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, Int)] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, Int)] = for { o <- one(1) t <- one(2) } yield (o, t) @@ -66,7 +66,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can mix data sources" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, List[Int])] = for { o <- one(1) m <- many(3) } yield (o, m) @@ -75,7 +75,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can use Fetch as a cartesian" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = (one(1), many(3)).tupled + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, List[Int])] = (one(1), many(3)).tupled val io = Fetch.run[IO](fetch) @@ -83,7 +83,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can use Fetch as an applicative" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, Int] = (one(1), one(2), one(3)).mapN(_ + _ + _) + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = (one(1), one(2), one(3)).mapN(_ + _ + _) val io = Fetch.run[IO](fetch) @@ -91,7 +91,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can traverse over a list with a Fetch for each element" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = for { manies <- many(3) ones <- manies.traverse(one[F]) } yield ones @@ -102,7 +102,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can depend on previous computations of Fetch values" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, Int] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = for { o <- one(1) t <- one(o + 1) } yield o + t @@ -113,7 +113,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can collect a list of Fetch into one" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(one(1), one(2), one(3)).sequence val io = Fetch.run[IO](fetch) @@ -122,7 +122,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can collect a list of Fetches with heterogeneous sources" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(one(1), one(2), one(3), anotherOne(4), anotherOne(5)).sequence val io = Fetch.run[IO](fetch) @@ -131,7 +131,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can collect the results of a traversal" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(1, 2, 3).traverse(one[F]) val io = Fetch.run[IO](fetch) @@ -142,7 +142,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { // Execution model "Monadic bind implies sequential execution" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, Int)] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, Int)] = for { o <- one(1) t <- one(2) @@ -159,7 +159,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Traversals are implicitly batched" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = for { manies <- many(3) ones <- manies.traverse(one[F]) @@ -176,7 +176,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Sequencing is implicitly batched" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(one(1), one(2), one(3)).sequence val io = Fetch.runEnv[IO](fetch) @@ -193,7 +193,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { "Identities are deduped when batched" in { val sources = List(1, 1, 2) - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = sources.traverse(one[F]) val io = Fetch.runEnv[IO](fetch) @@ -211,7 +211,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "The product of two fetches implies parallel fetching" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, List[Int])] = (one(1), many(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -226,7 +226,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Concurrent fetching calls batches only when it can" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, List[Int])] = (one(1), many(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -241,7 +241,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Concurrent fetching performs requests to multiple data sources in parallel" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, ((Int, List[Int]), Int)] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, ((Int, List[Int]), Int)] = ((one(1), many(2)).tupled, anotherOne(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -256,7 +256,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "The product of concurrent fetches implies everything fetched concurrently" in { - def fetch[F[_] : ContextShift : ConcurrentEffect] = ( + def fetch[F[_] : ConcurrentEffect : Par] = ( ( one(1), (one(2), one(3)).tupled @@ -277,18 +277,18 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "The product of concurrent fetches of the same type implies everything fetched in a single batch" in { - def aFetch[F[_] : ContextShift : ConcurrentEffect] = for { + def aFetch[F[_] : ConcurrentEffect : Par] = for { a <- one(1) // round 1 b <- many(1) // round 2 c <- one(1) } yield c - def anotherFetch[F[_] : ContextShift : ConcurrentEffect] = for { + def anotherFetch[F[_] : ConcurrentEffect : Par] = for { a <- one(2) // round 1 m <- many(2) // round 2 c <- one(2) } yield c - def fetch[F[_] : ContextShift : ConcurrentEffect] = ( + def fetch[F[_] : ConcurrentEffect : Par] = ( (aFetch[F], anotherFetch[F]).tupled, one(3) // round 1 ).tupled @@ -306,18 +306,18 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Every level of joined concurrent fetches is combined and batched" in { - def aFetch[F[_] : ContextShift : ConcurrentEffect] = for { + def aFetch[F[_] : ConcurrentEffect : Par] = for { a <- one(1) // round 1 b <- many(1) // round 2 c <- one(1) } yield c - def anotherFetch[F[_] : ContextShift : ConcurrentEffect] = for { + def anotherFetch[F[_] : ConcurrentEffect : Par] = for { a <- one(2) // round 1 m <- many(2) // round 2 c <- one(2) } yield c - def fetch[F[_] : ContextShift : ConcurrentEffect] = (aFetch[F], anotherFetch[F]).tupled + def fetch[F[_] : ConcurrentEffect : Par] = (aFetch[F], anotherFetch[F]).tupled val io = Fetch.runEnv[IO](fetch) @@ -332,21 +332,21 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Every level of sequenced concurrent fetches is batched" in { - def aFetch[F[_] : ContextShift : ConcurrentEffect] = + def aFetch[F[_] : ConcurrentEffect : Par] = for { a <- List(2, 3, 4).traverse(one[F]) // round 1 b <- List(0, 1).traverse(many[F]) // round 2 c <- List(9, 10, 11).traverse(one[F]) // round 3 } yield c - def anotherFetch[F[_] : ContextShift : ConcurrentEffect] = + def anotherFetch[F[_] : ConcurrentEffect : Par] = for { a <- List(5, 6, 7).traverse(one[F]) // round 1 b <- List(2, 3).traverse(many[F]) // round 2 c <- List(12, 13, 14).traverse(one[F]) // round 3 } yield c - def fetch[F[_] : ContextShift : ConcurrentEffect] = ( + def fetch[F[_] : ConcurrentEffect : Par] = ( (aFetch[F], anotherFetch[F]).tupled, List(15, 16, 17).traverse(one[F]) // round 1 ).tupled @@ -364,7 +364,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "The product of two fetches from the same data source implies batching" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, Int)] = (one(1), one(3)).tupled + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, Int)] = (one(1), one(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -379,7 +379,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Sequenced fetches are run concurrently" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(one(1), one(2), one(3), anotherOne(4), anotherOne(5)).sequence val io = Fetch.runEnv[IO](fetch) @@ -394,7 +394,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Sequenced fetches are deduped" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(one(1), one(2), one(1)).sequence val io = Fetch.runEnv[IO](fetch) @@ -410,7 +410,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Traversals are batched" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(1, 2, 3).traverse(one[F]) val io = Fetch.runEnv[IO](fetch) @@ -425,7 +425,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Duplicated sources are only fetched once" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(1, 2, 1).traverse(one[F]) val io = Fetch.runEnv[IO](fetch) @@ -440,7 +440,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Sources that can be fetched concurrently inside a for comprehension will be" in { - def fetch[F[_] : ContextShift : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = for { v <- Fetch.pure[F, List[Int]](List(1, 2, 1)) result <- v.traverse(one[F]) @@ -458,13 +458,13 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Pure Fetches allow to explore further in the Fetch" in { - def aFetch[F[_] : ContextShift : ConcurrentEffect] = + def aFetch[F[_] : ConcurrentEffect : Par] = for { a <- Fetch.pure[F, Int](2) b <- one[F](3) } yield a + b - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, Int)] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, Int)] = (one(1), aFetch[F]).tupled val io = Fetch.runEnv[IO](fetch) @@ -481,7 +481,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { // Caching "Elements are cached and thus not fetched more than once" in { - def fetch[F[_] : ContextShift : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { aOne <- one(1) anotherOne <- one(1) _ <- one(1) @@ -503,7 +503,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Batched elements are cached and thus not fetched more than once" in { - def fetch[F[_] : ContextShift : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { _ <- List(1, 2, 3).traverse(one[F]) aOne <- one(1) anotherOne <- one(1) @@ -514,7 +514,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { _ <- one(1) } yield aOne + anotherOne - val io = Fetch.runEnv(fetch) + val io = Fetch.runEnv[IO](fetch) io.map({ case (env, result) => { @@ -526,7 +526,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Elements that are cached won't be fetched" in { - def fetch[F[_] : ContextShift : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { aOne <- one(1) anotherOne <- one(1) _ <- one(1) @@ -537,7 +537,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { _ <- one(1) } yield aOne + anotherOne - val cache = InMemoryCache.from( + def cache[F[_] : ConcurrentEffect : Par] = InMemoryCache.from[F, One, Int]( (OneSource.name, One(1)) -> 1, (OneSource.name, One(2)) -> 2, (OneSource.name, One(3)) -> 3 @@ -554,16 +554,89 @@ class FetchTests extends AsyncFreeSpec with Matchers { }).unsafeToFuture } - case class ForgetfulCache() extends DataSourceCache { - def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache] = + "Fetch#run accepts a cache as the second (optional) parameter" in { + def fetch[F[_] : ConcurrentEffect : Par] = for { + aOne <- one(1) + anotherOne <- one(1) + _ <- one(1) + _ <- one(2) + _ <- one(3) + _ <- one(1) + _ <- List(1, 2, 3).traverse(one[F]) + _ <- one(1) + } yield aOne + anotherOne + + def cache[F[_] : ConcurrentEffect : Par] = InMemoryCache.from[F, One, Int]( + (OneSource.name, One(1)) -> 1, + (OneSource.name, One(2)) -> 2, + (OneSource.name, One(3)) -> 3 + ) + + val io = Fetch.run[IO](fetch, cache) + + io.map(_ shouldEqual 2).unsafeToFuture + } + + "Fetch#runCache accepts a cache as the second (optional) parameter" in { + def fetch[F[_] : ConcurrentEffect : Par] = for { + aOne <- one(1) + anotherOne <- one(1) + _ <- one(1) + _ <- one(2) + _ <- one(3) + _ <- one(1) + _ <- List(1, 2, 3).traverse(one[F]) + _ <- one(1) + } yield aOne + anotherOne + + def cache[F[_] : ConcurrentEffect : Par] = InMemoryCache.from[F, One, Int]( + (OneSource.name, One(1)) -> 1, + (OneSource.name, One(2)) -> 2, + (OneSource.name, One(3)) -> 3 + ) + + val io = Fetch.runCache[IO](fetch, cache) + + io.map({ + case (c, result) => { + result shouldEqual 2 + } + }).unsafeToFuture + } + + "Fetch#runCache works without the optional cache parameter" in { + def fetch[F[_] : ConcurrentEffect : Par] = for { + aOne <- one(1) + anotherOne <- one(1) + _ <- one(1) + _ <- one(2) + _ <- one(3) + _ <- one(1) + _ <- List(1, 2, 3).traverse(one[F]) + _ <- one(1) + } yield aOne + anotherOne + + val io = Fetch.runCache[IO](fetch) + + io.map({ + case (c, result) => { + result shouldEqual 2 + } + }).unsafeToFuture + } + + case class ForgetfulCache[F[_] : Monad]() extends DataSourceCache[F] { + def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = Applicative[F].pure(this) - def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = Applicative[F].pure(None) } + def forgetfulCache[F[_] : ConcurrentEffect : Par] = ForgetfulCache[F]() + "We can use a custom cache that discards elements" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { aOne <- one(1) anotherOne <- one(1) _ <- one(1) @@ -573,8 +646,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { _ <- one(1) } yield aOne + anotherOne - val cache = ForgetfulCache() - val io = Fetch.runEnv[IO](fetch, cache) + val io = Fetch.runEnv[IO](fetch, forgetfulCache) io.map({ case (env, result) => { @@ -586,7 +658,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can use a custom cache that discards elements together with concurrent fetches" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { aOne <- one(1) anotherOne <- one(1) _ <- one(1) @@ -597,8 +669,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { _ <- one(1) } yield aOne + anotherOne - val cache = ForgetfulCache() - val io = Fetch.runEnv[IO](fetch, cache) + val io = Fetch.runEnv[IO](fetch, forgetfulCache) io.map({ case (env, result) => { @@ -629,7 +700,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Data sources with errors won't fail if they're cached" in { - val cache = InMemoryCache.from( + def cache[F[_] : ConcurrentEffect : Par] = InMemoryCache.from[F, Never, Int]( (NeverSource.name, Never()) -> 1 ) val io = Fetch.run[IO](never, cache) @@ -637,7 +708,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { io.map(_ shouldEqual 1).unsafeToFuture } - def fetchError[F[_] : ConcurrentEffect : ContextShift]: Fetch[F, Int] = + def fetchError[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = Fetch.error(AnException()) "We can lift errors to Fetch" in { @@ -658,7 +729,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If a fetch fails in the left hand of a product the product will fail" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = + def fetch[F[_] : ConcurrentEffect : Par] = (fetchError, many(3)).tupled val io = Fetch.run[IO](fetch) @@ -670,7 +741,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If a fetch fails in the right hand of a product the product will fail" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = + def fetch[F[_] : ConcurrentEffect : Par] = (many(3), fetchError).tupled val io = Fetch.run[IO](fetch) @@ -682,7 +753,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If there is a missing identity in the left hand of a product the product will fail" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = + def fetch[F[_] : ConcurrentEffect : Par] = (never, many(3)).tupled val io = Fetch.run[IO](fetch) @@ -694,7 +765,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If there is a missing identity in the right hand of a product the product will fail" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = + def fetch[F[_] : ConcurrentEffect : Par] = (many(3), never).tupled val io = Fetch.run[IO](fetch) @@ -706,7 +777,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If there are multiple failing identities the fetch will fail" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = + def fetch[F[_] : ConcurrentEffect : Par] = (never, never).tupled val io = Fetch.run[IO](fetch) @@ -733,25 +804,25 @@ class FetchTests extends AsyncFreeSpec with Matchers { Applicative[F].pure(Option(id.id)) } - def maybeOpt[F[_] : ConcurrentEffect](id: Int): Fetch[F, Option[Int]] = + def maybeOpt[F[_] : ConcurrentEffect : Par](id: Int): Fetch[F, Option[Int]] = Fetch.optional(MaybeMissing(id), MaybeMissingSource) "We can run optional fetches" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, Option[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Option[Int]] = maybeOpt(1) Fetch.run[IO](fetch).map(_ shouldEqual Some(1)).unsafeToFuture } "We can run optional fetches with traverse" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(1, 2, 3).traverse(maybeOpt[F]).map(_.flatten) Fetch.run[IO](fetch).map(_ shouldEqual List(1, 3)).unsafeToFuture } "We can run optional fetches with other data sources" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = { val ones = List(1, 2, 3).traverse(one[F]) val maybes = List(1, 2, 3).traverse(maybeOpt[F]) (ones, maybes).mapN { case (os, ms) => os ++ ms.flatten } @@ -761,7 +832,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can make fetches that depend on optional fetch results when they aren't defined" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = for { maybe <- maybeOpt(2) result <- maybe.fold(Fetch.pure(42))(i => one(i)) } yield result @@ -770,7 +841,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can make fetches that depend on optional fetch results when they are defined" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = for { maybe <- maybeOpt(1) result <- maybe.fold(Fetch.pure(42))(i => one(i)) } yield result diff --git a/shared/src/test/scala/TestHelper.scala b/shared/src/test/scala/TestHelper.scala index c59d0ade..8aeb321e 100644 --- a/shared/src/test/scala/TestHelper.scala +++ b/shared/src/test/scala/TestHelper.scala @@ -45,7 +45,7 @@ object TestHelper { ) } - def one[F[_] : ConcurrentEffect](id: Int): Fetch[F, Int] = + def one[F[_] : ConcurrentEffect : Par](id: Int): Fetch[F, Int] = Fetch(One(id), OneSource) case class Many(n: Int) @@ -57,7 +57,7 @@ object TestHelper { Applicative[F].pure(Option(0 until id.n toList)) } - def many[F[_] : ConcurrentEffect](id: Int): Fetch[F, List[Int]] = + def many[F[_] : ConcurrentEffect : Par](id: Int): Fetch[F, List[Int]] = Fetch(Many(id), ManySource) case class AnotherOne(id: Int) @@ -74,7 +74,7 @@ object TestHelper { ) } - def anotherOne[F[_] : ConcurrentEffect](id: Int): Fetch[F, Int] = + def anotherOne[F[_] : ConcurrentEffect : Par](id: Int): Fetch[F, Int] = Fetch(AnotherOne(id), AnotheroneSource) case class Never() @@ -86,7 +86,7 @@ object TestHelper { Applicative[F].pure(None : Option[Int]) } - def never[F[_] : ConcurrentEffect]: Fetch[F, Int] = + def never[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = Fetch(Never(), NeverSource) // Check Env