Skip to content

Commit 7ca2e86

Browse files
committed
#998 Client side caching
1 parent dd861e5 commit 7ca2e86

File tree

5 files changed

+94
-13
lines changed

5 files changed

+94
-13
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ val commonSettings = Seq(
4444
testFrameworks += new TestFramework("munit.Framework"),
4545
libraryDependencies ++= Seq(
4646
Libraries.catsEffectKernel,
47+
Libraries.catsEffectStd,
4748
Libraries.redisClient,
4849
Libraries.catsEffect % Test,
4950
Libraries.catsLaws % Test,
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2018-2021 ProfunKtor
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package dev.profunktor.redis4cats.caching
18+
19+
trait CacheAccessor[F[_], K, V] {
20+
def get(key: K): F[V]
21+
def put(key: K, value: V): F[Unit]
22+
def evict(key: K): F[Unit]
23+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2018-2021 ProfunKtor
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package dev.profunktor.redis4cats.caching
18+
19+
import cats.effect.{ ContextShift, Effect, Resource, Sync }
20+
import dev.profunktor.redis4cats.effect.RedisExecutor
21+
import io.lettuce.core.TrackingArgs
22+
import io.lettuce.core.api.StatefulRedisConnection
23+
import io.lettuce.core.support.caching.{
24+
CacheAccessor => JCacheAccessor,
25+
CacheFrontend => JCacheFrontend,
26+
ClientSideCaching => JClientSideCaching
27+
}
28+
29+
object ClientSideCaching {
30+
31+
def make[F[_]: ContextShift: Sync: Effect, K, V](
32+
connection: StatefulRedisConnection[K, V],
33+
args: TrackingArgs,
34+
cacheAccessor: CacheAccessor[F, K, V]
35+
): Resource[F, JCacheFrontend[K, V]] =
36+
RedisExecutor.make[F].flatMap { redisExecutor =>
37+
Resource.make[F, JCacheFrontend[K, V]] {
38+
Sync[F].delay {
39+
JClientSideCaching.enable(
40+
new JCacheAccessor[K, V] {
41+
override def get(key: K): V = redisExecutor.unsafeRun(cacheAccessor.get(key))
42+
override def put(key: K, value: V): Unit = redisExecutor.unsafeRun(cacheAccessor.put(key, value))
43+
override def evict(key: K): Unit = redisExecutor.unsafeRun(cacheAccessor.evict(key))
44+
},
45+
connection,
46+
args
47+
)
48+
}
49+
}(cacheFrontend => Sync[F].delay(cacheFrontend.close()))
50+
}
51+
}

modules/core/src/main/scala/dev/profunktor/redis4cats/effect/TxExecutor.scala

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,30 @@ import scala.util.control.NonFatal
2929

3030
import cats.effect.kernel._
3131
import cats.syntax.all._
32+
import cats.effect.std.Dispatcher
3233

3334
private[redis4cats] trait TxExecutor[F[_]] {
3435
def delay[A](thunk: => A): F[A]
3536
def eval[A](fa: F[A]): F[A]
3637
def start[A](fa: F[A]): F[Fiber[F, Throwable, A]]
37-
def liftK[G[_]: Async]: TxExecutor[G]
38+
def liftK[G[_]: Async: Dispatcher]: TxExecutor[G]
39+
def unsafeRun[A](fa: F[A]): A
3840
}
3941

4042
private[redis4cats] object TxExecutor {
4143
def make[F[_]: Async]: Resource[F, TxExecutor[F]] =
42-
Resource
43-
.make(Sync[F].delay(Executors.newFixedThreadPool(1, TxThreadFactory))) { ec =>
44-
Sync[F]
45-
.delay(ec.shutdownNow())
46-
.ensure(new IllegalStateException("There were outstanding tasks at time of shutdown of the Redis thread"))(
47-
_.isEmpty
48-
)
49-
.void
50-
}
51-
.map(es => fromEC(exitOnFatal(ExecutionContext.fromExecutorService(es))))
44+
Dispatcher.parallel[F].flatMap { dispatcher =>
45+
Resource
46+
.make(Sync[F].delay(Executors.newFixedThreadPool(1, TxThreadFactory))) { ec =>
47+
Sync[F]
48+
.delay(ec.shutdownNow())
49+
.ensure(new IllegalStateException("There were outstanding tasks at time of shutdown of the Redis thread"))(
50+
_.isEmpty
51+
)
52+
.void
53+
}
54+
.map(es => fromEC(exitOnFatal(ExecutionContext.fromExecutorService(es)))(dispatcher))
55+
}
5256

5357
private def exitOnFatal(ec: ExecutionContext): ExecutionContext = new ExecutionContext {
5458
def execute(r: Runnable): Unit =
@@ -70,11 +74,12 @@ private[redis4cats] object TxExecutor {
7074
ec.reportFailure(t)
7175
}
7276

73-
private def fromEC[F[_]: Async](ec: ExecutionContext): TxExecutor[F] =
77+
private def fromEC[F[_]: Async](ec: ExecutionContext)(dispatcher: Dispatcher[F]): TxExecutor[F] =
7478
new TxExecutor[F] {
7579
def delay[A](thunk: => A): F[A] = eval(Sync[F].delay(thunk))
7680
def eval[A](fa: F[A]): F[A] = Async[F].evalOn(fa, ec)
7781
def start[A](fa: F[A]): F[Fiber[F, Throwable, A]] = Async[F].startOn(fa, ec)
78-
def liftK[G[_]: Async]: TxExecutor[G] = fromEC[G](ec)
82+
def liftK[G[_]: Async: Dispatcher]: TxExecutor[G] = fromEC[G](ec)(implicitly)
83+
def unsafeRun[A](fa: F[A]): A = dispatcher.unsafeRunSync(fa)
7984
}
8085
}

project/Dependencies.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ object Dependencies {
2626
def log4cats(artifact: String): ModuleID = "org.typelevel" %% s"log4cats-$artifact" % V.log4cats
2727

2828
val catsEffectKernel = "org.typelevel" %% "cats-effect-kernel" % V.catsEffect
29+
val catsEffectStd = "org.typelevel" %% "cats-effect-std" % V.catsEffect
2930
val fs2Core = "co.fs2" %% "fs2-core" % V.fs2
3031
val keyPool = "org.typelevel" %% "keypool" % V.keyPool
3132

0 commit comments

Comments
 (0)