From 29e71293257e3c21c14978a56902755401665976 Mon Sep 17 00:00:00 2001 From: a-khakimov Date: Wed, 23 Apr 2025 22:50:23 +0500 Subject: [PATCH] #998 Client side caching --- build.sbt | 1 + .../redis4cats/caching/CacheAccessor.scala | 23 +++++++++ .../caching/ClientSideCaching.scala | 51 +++++++++++++++++++ .../redis4cats/effect/TxExecutor.scala | 31 ++++++----- .../profunktor/redis4cats/tx/TxRunner.scala | 6 +-- .../dev/profunktor/redis4cats/commands.scala | 3 +- .../dev/profunktor/redis4cats/redis.scala | 3 +- project/Dependencies.scala | 1 + 8 files changed, 101 insertions(+), 18 deletions(-) create mode 100644 modules/core/src/main/scala/dev/profunktor/redis4cats/caching/CacheAccessor.scala create mode 100644 modules/core/src/main/scala/dev/profunktor/redis4cats/caching/ClientSideCaching.scala diff --git a/build.sbt b/build.sbt index ee1daaace..ae5685d00 100644 --- a/build.sbt +++ b/build.sbt @@ -44,6 +44,7 @@ val commonSettings = Seq( testFrameworks += new TestFramework("munit.Framework"), libraryDependencies ++= Seq( Libraries.catsEffectKernel, + Libraries.catsEffectStd, Libraries.redisClient, Libraries.catsEffect % Test, Libraries.catsLaws % Test, diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/caching/CacheAccessor.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/caching/CacheAccessor.scala new file mode 100644 index 000000000..708d10137 --- /dev/null +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/caching/CacheAccessor.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2018-2021 ProfunKtor + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.profunktor.redis4cats.caching + +trait CacheAccessor[F[_], K, V] { + def get(key: K): F[V] + def put(key: K, value: V): F[Unit] + def evict(key: K): F[Unit] +} diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/caching/ClientSideCaching.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/caching/ClientSideCaching.scala new file mode 100644 index 000000000..3738f8d70 --- /dev/null +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/caching/ClientSideCaching.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2018-2021 ProfunKtor + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.profunktor.redis4cats.caching + +import cats.effect.kernel.{ Async, Resource } +import dev.profunktor.redis4cats.effect.TxExecutor +import io.lettuce.core.TrackingArgs +import io.lettuce.core.api.StatefulRedisConnection +import io.lettuce.core.support.caching.{ + CacheAccessor => JCacheAccessor, + CacheFrontend => JCacheFrontend, + ClientSideCaching => JClientSideCaching +} + +object ClientSideCaching { + + def make[F[_]: Async, K, V]( + connection: StatefulRedisConnection[K, V], + args: TrackingArgs, + cacheAccessor: CacheAccessor[F, K, V] + ): Resource[F, JCacheFrontend[K, V]] = + TxExecutor.make[F].flatMap { redisExecutor => + Resource.make[F, JCacheFrontend[K, V]] { + Async[F].delay { + JClientSideCaching.enable( + new JCacheAccessor[K, V] { + override def get(key: K): V = redisExecutor.unsafeRun(cacheAccessor.get(key)) + override def put(key: K, value: V): Unit = redisExecutor.unsafeRun(cacheAccessor.put(key, value)) + override def evict(key: K): Unit = redisExecutor.unsafeRun(cacheAccessor.evict(key)) + }, + connection, + args + ) + } + }(cacheFrontend => Async[F].delay(cacheFrontend.close())) + } +} diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/effect/TxExecutor.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/effect/TxExecutor.scala index d5704c1d9..31dc5ed36 100644 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/effect/TxExecutor.scala +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/effect/TxExecutor.scala @@ -29,26 +29,30 @@ import scala.util.control.NonFatal import cats.effect.kernel._ import cats.syntax.all._ +import cats.effect.std.Dispatcher private[redis4cats] trait TxExecutor[F[_]] { def delay[A](thunk: => A): F[A] def eval[A](fa: F[A]): F[A] def start[A](fa: F[A]): F[Fiber[F, Throwable, A]] - def liftK[G[_]: Async]: TxExecutor[G] + def liftK[G[_]: Async: Dispatcher]: TxExecutor[G] + def unsafeRun[A](fa: F[A]): A } private[redis4cats] object TxExecutor { def make[F[_]: Async]: Resource[F, TxExecutor[F]] = - Resource - .make(Sync[F].delay(Executors.newFixedThreadPool(1, TxThreadFactory))) { ec => - Sync[F] - .delay(ec.shutdownNow()) - .ensure(new IllegalStateException("There were outstanding tasks at time of shutdown of the Redis thread"))( - _.isEmpty - ) - .void - } - .map(es => fromEC(exitOnFatal(ExecutionContext.fromExecutorService(es)))) + Dispatcher.parallel[F].flatMap { dispatcher => + Resource + .make(Sync[F].delay(Executors.newFixedThreadPool(1, TxThreadFactory))) { ec => + Sync[F] + .delay(ec.shutdownNow()) + .ensure(new IllegalStateException("There were outstanding tasks at time of shutdown of the Redis thread"))( + _.isEmpty + ) + .void + } + .map(es => fromEC(exitOnFatal(ExecutionContext.fromExecutorService(es)))(dispatcher)) + } private def exitOnFatal(ec: ExecutionContext): ExecutionContext = new ExecutionContext { def execute(r: Runnable): Unit = @@ -70,11 +74,12 @@ private[redis4cats] object TxExecutor { ec.reportFailure(t) } - private def fromEC[F[_]: Async](ec: ExecutionContext): TxExecutor[F] = + private def fromEC[F[_]: Async](ec: ExecutionContext)(dispatcher: Dispatcher[F]): TxExecutor[F] = new TxExecutor[F] { def delay[A](thunk: => A): F[A] = eval(Sync[F].delay(thunk)) def eval[A](fa: F[A]): F[A] = Async[F].evalOn(fa, ec) def start[A](fa: F[A]): F[Fiber[F, Throwable, A]] = Async[F].startOn(fa, ec) - def liftK[G[_]: Async]: TxExecutor[G] = fromEC[G](ec) + def liftK[G[_]: Async: Dispatcher]: TxExecutor[G] = fromEC[G](ec)(implicitly) + def unsafeRun[A](fa: F[A]): A = dispatcher.unsafeRunSync(fa) } } diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/tx/TxRunner.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/tx/TxRunner.scala index 7e08cbe09..87492bd77 100644 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/tx/TxRunner.scala +++ b/modules/core/src/main/scala/dev/profunktor/redis4cats/tx/TxRunner.scala @@ -18,8 +18,8 @@ package dev.profunktor.redis4cats.tx import cats.effect.kernel._ import cats.effect.kernel.syntax.all._ +import cats.effect.std.Dispatcher import cats.syntax.all._ - import dev.profunktor.redis4cats.effect.TxExecutor private[redis4cats] trait TxRunner[F[_]] { @@ -30,7 +30,7 @@ private[redis4cats] trait TxRunner[F[_]] { )( fs: TxStore[F, String, A] => List[F[Unit]] ): F[Map[String, A]] - def liftK[G[_]: Async]: TxRunner[G] + def liftK[G[_]: Async: Dispatcher]: TxRunner[G] } private[redis4cats] object TxRunner { @@ -60,6 +60,6 @@ private[redis4cats] object TxRunner { } *> store.get } - def liftK[G[_]: Async]: TxRunner[G] = make[G](t.liftK[G]) + def liftK[G[_]: Async: Dispatcher]: TxRunner[G] = make[G](t.liftK[G]) } } diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/commands.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/commands.scala index f685e6fdd..e8fb79413 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/commands.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/commands.scala @@ -18,6 +18,7 @@ package dev.profunktor.redis4cats import algebra._ import cats.effect.kernel.Async +import cats.effect.std.Dispatcher import dev.profunktor.redis4cats.effect.Log trait RedisCommands[F[_], K, V] @@ -39,7 +40,7 @@ trait RedisCommands[F[_], K, V] object RedisCommands { implicit class LiftKOps[F[_], K, V](val cmd: RedisCommands[F, K, V]) extends AnyVal { - def liftK[G[_]: Async: Log]: RedisCommands[G, K, V] = + def liftK[G[_]: Async: Dispatcher: Log]: RedisCommands[G, K, V] = cmd.asInstanceOf[BaseRedis[F, K, V]].liftK[G] } } diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala index c466deeed..46053cd18 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala @@ -19,6 +19,7 @@ package dev.profunktor.redis4cats import cats._ import cats.data.NonEmptyList import cats.effect.kernel._ +import cats.effect.std.Dispatcher import cats.syntax.all._ import dev.profunktor.redis4cats.algebra.BitCommandOperation import dev.profunktor.redis4cats.algebra.BitCommandOperation.Overflows @@ -448,7 +449,7 @@ private[redis4cats] class BaseRedis[F[_]: FutureLift: MonadThrow: Log, K, V]( ) extends RedisCommands[F, K, V] with RedisConversionOps { - def liftK[G[_]: Async: Log]: RedisCommands[G, K, V] = + def liftK[G[_]: Async: Dispatcher: Log]: RedisCommands[G, K, V] = new BaseRedis[G, K, V](conn.liftK[G], tx.liftK[G], cluster) import dev.profunktor.redis4cats.JavaConversions._ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0b8db8f0a..dfe169e8a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -26,6 +26,7 @@ object Dependencies { def log4cats(artifact: String): ModuleID = "org.typelevel" %% s"log4cats-$artifact" % V.log4cats val catsEffectKernel = "org.typelevel" %% "cats-effect-kernel" % V.catsEffect + val catsEffectStd = "org.typelevel" %% "cats-effect-std" % V.catsEffect val fs2Core = "co.fs2" %% "fs2-core" % V.fs2 val keyPool = "org.typelevel" %% "keypool" % V.keyPool