diff --git a/build.sbt b/build.sbt index 8d67ed1e0..5255c208c 100644 --- a/build.sbt +++ b/build.sbt @@ -45,6 +45,7 @@ val commonSettings = Seq( libraryDependencies ++= Seq( Libraries.catsEffectKernel, Libraries.redisClient, + Libraries.keyPool % Optional, Libraries.catsEffect % Test, Libraries.catsLaws % Test, Libraries.catsTestKit % Test, diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/effect/MkRedis.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/effect/MkRedis.scala index 03f6e4a34..953b01fad 100644 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/effect/MkRedis.scala +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/effect/MkRedis.scala @@ -52,6 +52,7 @@ sealed trait MkRedis[F[_]] { private[redis4cats] def txRunner: Resource[F, TxRunner[F]] private[redis4cats] def futureLift: FutureLift[F] private[redis4cats] def log: Log[F] + private[redis4cats] def availableProcessors: F[Int] } object MkRedis { @@ -85,6 +86,8 @@ object MkRedis { private[redis4cats] def futureLift: FutureLift[F] = implicitly private[redis4cats] def log: Log[F] = implicitly + + private[redis4cats] def availableProcessors: F[Int] = Async[F].blocking(Runtime.getRuntime.availableProcessors()) } } diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala index 88274ab56..a969160eb 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala @@ -53,6 +53,7 @@ import io.lettuce.core.{ ScanCursor => JScanCursor, SetArgs => JSetArgs } +import org.typelevel.keypool.KeyPool import java.time.Instant import java.util.concurrent.TimeUnit @@ -60,6 +61,33 @@ import scala.concurrent.duration._ object Redis { + object Pool { + final case class Settings(maxTotal: Int, maxIdle: Int, idleTimeAllowedInPool: FiniteDuration) + object Settings { + object Defaults { + val minimumTotal: Int = 10 + val maxIdle: Int = 2 + val idleTimeAllowedInPool: FiniteDuration = 60.seconds + } + + def default[F[_]: MkRedis: Functor]: F[Settings] = + MkRedis[F].availableProcessors.map { cpu => + Settings( + maxTotal = Math.max(Defaults.minimumTotal, cpu), + maxIdle = Defaults.maxIdle, + idleTimeAllowedInPool = Defaults.idleTimeAllowedInPool + ) + } + } + + implicit class PoolOps[F[_], K, V](val pool: KeyPool[F, Unit, RedisCommands[F, K, V]]) extends AnyVal { + @inline def withRedisCommands[A]( + fn: RedisCommands[F, K, V] => F[A] + )(implicit M: MonadCancel[F, Throwable]): F[A] = + pool.take(()).use(managed => fn(managed.value)) + } + } + private[redis4cats] def acquireAndRelease[F[_]: FutureLift: Log: MonadThrow, K, V]( client: RedisClient, codec: RedisCodec[K, V], @@ -239,6 +267,50 @@ object Redis { Resource.make(acquire)(release).widen } + /** + * Creates a pool of [[RedisCommands]] for a single-node connection. + * + * Example: + * + * {{{ + * val pool: Resource[IO, KeyPool[IO, Unit, RedisCommands[IO, String, String]]] = + * for { + * uri <- Resource.eval(RedisURI.make[IO]("redis://localhost")) + * cli <- RedisClient[IO](uri) + * pool <- Redis[IO].pooled(cli, RedisCodec.Utf8) + * } yield pool + * + * pool.use(_.withRedisCommands(redis => redis.set(usernameKey, "some value"))) + * }}} + * + */ + def pooled[K, V]( + client: RedisClient, + codec: RedisCodec[K, V] + )(implicit T: Temporal[F]): Resource[F, KeyPool[F, Unit, RedisCommands[F, K, V]]] = + Resource + .eval(Redis.Pool.Settings.default[F]) + .flatMap(poolSettings => customPooled[K, V](client, codec, poolSettings)) + + /** + * Creates a pool of [[RedisCommands]] for a single-node connection. + * Similar to [[pooled]] but allows custom [[Redis.Pool.Settings]] + */ + def customPooled[K, V]( + client: RedisClient, + codec: RedisCodec[K, V], + poolSettings: Redis.Pool.Settings + )(implicit T: Temporal[F]): Resource[F, KeyPool[F, Unit, RedisCommands[F, K, V]]] = { + val cmdsResource: Resource[F, RedisCommands[F, K, V]] = fromClient(client, codec) + KeyPool + .Builder[F, Unit, RedisCommands[F, K, V]]((_: Unit) => cmdsResource) + .withMaxPerKey(Function.const(poolSettings.maxTotal)) + .withMaxTotal(poolSettings.maxTotal) + .withMaxIdle(poolSettings.maxIdle) + .withIdleTimeAllowedInPool(poolSettings.idleTimeAllowedInPool) + .build + } + /** * Creates a [[RedisCommands]] for a cluster connection. * diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPoolDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPoolDemo.scala new file mode 100644 index 000000000..34540734e --- /dev/null +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPoolDemo.scala @@ -0,0 +1,89 @@ +/* + * Copyright 2018-2021 ProfunKtor + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.profunktor.redis4cats + +import cats.effect.IO +import dev.profunktor.redis4cats.connection._ +import dev.profunktor.redis4cats.effect.Log.NoOp._ +import dev.profunktor.redis4cats.Redis.Pool._ +import io.lettuce.core.RedisCommandExecutionException +import org.typelevel.keypool.KeyPool +import fs2.Stream + +object RedisPoolDemo extends LoggerIOApp { + import Demo._ + + val usernameKey = "test" + val numericKey = "numeric" + + val showResult: Option[String] => IO[Unit] = + _.fold(IO.println(s"Not found key: $usernameKey"))(IO.println) + + // simple strings program + def p1(stringPool: KeyPool[IO, Unit, RedisCommands[IO, String, String]]): IO[Unit] = + stringPool.withRedisCommands { redis => + for { + x <- redis.get(usernameKey) + _ <- showResult(x) + _ <- redis.set(usernameKey, "some value") + y <- redis.get(usernameKey) + _ <- showResult(y) + _ <- redis.setNx(usernameKey, "should not happen") + w <- redis.get(usernameKey) + _ <- showResult(w) + } yield () + } + + // proof that you can still get it wrong with `incr` and `decr`, even if type-safe + def p2( + stringPool: KeyPool[IO, Unit, RedisCommands[IO, String, String]], + longPool: KeyPool[IO, Unit, RedisCommands[IO, String, Long]] + ): IO[Unit] = + stringPool.withRedisCommands { redis => + longPool.withRedisCommands { redisN => + for { + x <- redis.get(numericKey) + _ <- showResult(x) + _ <- redis.set(numericKey, "not a number") + y <- redis.get(numericKey) + _ <- showResult(y) + _ <- redisN.incr(numericKey).attempt.flatMap { + case Left(e: RedisCommandExecutionException) => + IO(assert(e.getMessage == "ERR value is not an integer or out of range")) + case _ => + IO.raiseError(new Exception("Expected error")) + } + w <- redis.get(numericKey) + _ <- showResult(w) + } yield () + } + } + + val program: IO[Unit] = { + val res: Stream[IO, Unit] = + for { + cli <- Stream.resource(RedisClient[IO].from(redisURI)) + rd1 <- Stream.resource(Redis[IO].pooled(cli, stringCodec)) + rd2 <- Stream.resource(Redis[IO].pooled(cli, longCodec)) + _ <- Stream.eval(p1(rd1) *> p2(rd1, rd2)) + } yield () + + res.compile.lastOrError + + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ab99d38d1..ae0fd6988 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -8,6 +8,7 @@ object Dependencies { val circe = "0.14.6" val fs2 = "3.9.4" val log4cats = "2.6.0" + val keyPool = "0.4.8" val lettuce = "6.3.2.RELEASE" val logback = "1.5.3" @@ -23,6 +24,7 @@ object Dependencies { val catsEffectKernel = "org.typelevel" %% "cats-effect-kernel" % V.catsEffect val fs2Core = "co.fs2" %% "fs2-core" % V.fs2 + val keyPool = "org.typelevel" %% "keypool" % V.keyPool val log4CatsCore = log4cats("core")