Skip to content

Commit

Permalink
Merge pull request #155 from a-nickol/redis-connection-manager
Browse files Browse the repository at this point in the history
Additional `redis_connection_manager` for usage of `ConnectionManager` from `redis`
  • Loading branch information
jaemk authored Jun 3, 2023
2 parents 4018d94 + b2534a3 commit 00dc83f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ proc_macro = ["tokio", "cached_proc_macro", "cached_proc_macro_types"]
async = ["futures", "tokio", "async-trait"]
async_tokio_rt_multi_thread = ["async", "tokio/rt-multi-thread"]
redis_store = ["redis", "r2d2", "serde", "serde_json"]
redis_connection_manager = ["redis_store", "redis/connection-manager"]
redis_async_std = ["redis_store", "async", "redis/aio", "redis/async-std-comp", "redis/tls", "redis/async-std-tls-comp"]
redis_tokio = ["redis_store", "async", "redis/aio", "redis/tokio-comp", "redis/tls", "redis/tokio-native-tls-comp"]
redis_ahash = ["redis_store", "redis/ahash"]
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ of un-cached arguments, specify `#[cached(sync_writes = true)]` / `#[once(sync_w
- `redis_store`: Include Redis cache store
- `redis_async_std`: Include async Redis support using `async-std` and `async-std` tls support, implies `redis_store` and `async`
- `redis_tokio`: Include async Redis support using `tokio` and `tokio` tls support, implies `redis_store` and `async`
- `redis_connection_manager`: Enable the optional `connection-manager` feature of `redis`. Any async redis caches created
will use a connection manager instead of a `MultiplexedConnection`
- `redis_ahash`: Enable the optional `ahash` feature of `redis`
- `wasm`: Enable WASM support. Note that this feature is incompatible with `tokio`'s multi-thread
runtime (`async_tokio_rt_multi_thread`) and all Redis features (`redis_store`, `redis_async_std`, `redis_tokio`, `redis_ahash`)
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ of un-cached arguments, specify `#[cached(sync_writes = true)]` / `#[once(sync_w
- `redis_store`: Include Redis cache store
- `redis_async_std`: Include async Redis support using `async-std` and `async-std` tls support, implies `redis_store` and `async`
- `redis_tokio`: Include async Redis support using `tokio` and `tokio` tls support, implies `redis_store` and `async`
- `redis_connection_manager`: Enable the optional `connection-manager` feature of `redis`. Any async redis caches created
will use a connection manager instead of a `MultiplexedConnection`
- `redis_ahash`: Enable the optional `ahash` feature of `redis`
- `wasm`: Enable WASM support. Note that this feature is incompatible with `tokio`'s multi-thread
runtime (`async_tokio_rt_multi_thread`) and all Redis features (`redis_store`, `redis_async_std`, `redis_tokio`, `redis_ahash`)
Expand Down
35 changes: 29 additions & 6 deletions src/stores/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ mod async_redis {
}
}

/// Create a multiplexed redis connection. This is a single connection that can
/// be used asynchronously by multiple futures.
#[cfg(not(feature = "redis_connection_manager"))]
async fn create_multiplexed_connection(
&self,
) -> Result<redis::aio::MultiplexedConnection, RedisCacheBuildError> {
Expand All @@ -462,6 +465,19 @@ mod async_redis {
Ok(conn)
}

/// Create a multiplexed connection wrapped in a manager. The manager provides access
/// to a multiplexed connection and will automatically reconnect to the server when
/// necessary.
#[cfg(feature = "redis_connection_manager")]
async fn create_connection_manager(
&self,
) -> Result<redis::aio::ConnectionManager, RedisCacheBuildError> {
let s = self.connection_string()?;
let client = redis::Client::open(s)?;
let conn = redis::aio::ConnectionManager::new(client).await?;
Ok(conn)
}

/// The last step in building a `RedisCache` is to call `build()`
///
/// # Errors
Expand All @@ -472,7 +488,10 @@ mod async_redis {
seconds: self.seconds,
refresh: self.refresh,
connection_string: self.connection_string()?,
multiplexed_connection: self.create_multiplexed_connection().await?,
#[cfg(not(feature = "redis_connection_manager"))]
connection: self.create_multiplexed_connection().await?,
#[cfg(feature = "redis_connection_manager")]
connection: self.create_connection_manager().await?,
namespace: self.namespace,
prefix: self.prefix,
_phantom: PhantomData::default(),
Expand All @@ -483,14 +502,18 @@ mod async_redis {
/// Cache store backed by redis
///
/// Values have a ttl applied and enforced by redis.
/// Uses a `redis::aio::MultiplexedConnection` under the hood.
/// Uses a `redis::aio::MultiplexedConnection` or `redis::aio::ConnectionManager`
/// under the hood depending if feature `redis_connection_manager` is used or not.
pub struct AsyncRedisCache<K, V> {
pub(super) seconds: u64,
pub(super) refresh: bool,
pub(super) namespace: String,
pub(super) prefix: String,
connection_string: String,
multiplexed_connection: redis::aio::MultiplexedConnection,
#[cfg(not(feature = "redis_connection_manager"))]
connection: redis::aio::MultiplexedConnection,
#[cfg(feature = "redis_connection_manager")]
connection: redis::aio::ConnectionManager,
_phantom: PhantomData<(K, V)>,
}

Expand Down Expand Up @@ -526,7 +549,7 @@ mod async_redis {

/// Get a cached value
async fn cache_get(&self, key: &K) -> Result<Option<V>, Self::Error> {
let mut conn = self.multiplexed_connection.clone();
let mut conn = self.connection.clone();
let mut pipe = redis::pipe();
let key = self.generate_key(key);

Expand All @@ -551,7 +574,7 @@ mod async_redis {

/// Set a cached value
async fn cache_set(&self, key: K, val: V) -> Result<Option<V>, Self::Error> {
let mut conn = self.multiplexed_connection.clone();
let mut conn = self.connection.clone();
let mut pipe = redis::pipe();
let key = self.generate_key(&key);

Expand Down Expand Up @@ -582,7 +605,7 @@ mod async_redis {

/// Remove a cached value
async fn cache_remove(&self, key: &K) -> Result<Option<V>, Self::Error> {
let mut conn = self.multiplexed_connection.clone();
let mut conn = self.connection.clone();
let mut pipe = redis::pipe();
let key = self.generate_key(key);

Expand Down

0 comments on commit 00dc83f

Please sign in to comment.