Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache any value & redis driver for cache #1217

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ storage_azure = ["opendal/services-azblob"]
storage_gcp = ["opendal/services-gcs"]
# Cache feature
cache_inmem = ["dep:moka"]
cache_redis = ["dep:bb8", "dep:redis", "dep:bb8-redis", "dep:rmp-serde"]
bg_redis = ["dep:rusty-sidekiq", "dep:bb8"]
bg_pg = ["dep:sqlx", "dep:ulid"]
bg_sqlt = ["dep:sqlx", "dep:ulid"]
Expand Down Expand Up @@ -157,6 +158,11 @@ ulid = { version = "1", optional = true }
rusty-sidekiq = { version = "0.11.0", default-features = false, optional = true }
bb8 = { version = "0.8.1", optional = true }

# rediq queue
bb8-redis = { version = "0.20.0", optional = true }
redis = { version = "0.28.2", optional = true }
rmp-serde = { version = "1.3.0", optional = true }

scraper = { version = "0.21.0", features = ["deterministic"], optional = true }

[workspace.dependencies]
Expand Down
3 changes: 2 additions & 1 deletion src/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,14 @@ pub async fn create_context<H: Hooks>(
};

let queue_provider = bgworker::create_queue_provider(&config).await?;
let cache = cache::create_cache_provider(&config).await?;
let ctx = AppContext {
environment: environment.clone(),
#[cfg(feature = "with-db")]
db,
queue_provider,
storage: Storage::single(storage::drivers::null::new()).into(),
cache: cache::Cache::new(cache::drivers::null::new()).into(),
cache,
config,
mailer,
};
Expand Down
18 changes: 10 additions & 8 deletions src/cache/drivers/inmem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ use std::{

use async_trait::async_trait;
use moka::{sync::Cache, Expiry};

use serde::de::DeserializeOwned;
use serde::Serialize;
use super::CacheDriver;
use crate::cache::CacheResult;
use crate::config::InMemCacheConfig;

/// Creates a new instance of the in-memory cache driver, with a default Loco
/// configuration.
Expand All @@ -19,9 +21,9 @@ use crate::cache::CacheResult;
///
/// A boxed [`CacheDriver`] instance.
#[must_use]
pub fn new() -> Box<dyn CacheDriver> {
pub async fn new(config: &InMemCacheConfig) -> Box<dyn CacheDriver> {
let cache: Cache<String, (Expiration, String)> = Cache::builder()
.max_capacity(32 * 1024 * 1024)
.max_capacity(config.max_capacity)
.expire_after(InMemExpiry)
.build();
Inmem::from(cache)
Expand All @@ -30,7 +32,7 @@ pub fn new() -> Box<dyn CacheDriver> {
/// Represents the in-memory cache driver.
#[derive(Debug)]
pub struct Inmem {
cache: Cache<String, (Expiration, String)>,
cache: Cache<String, (Expiration, dyn Serialize)>,
}

impl Inmem {
Expand Down Expand Up @@ -61,7 +63,7 @@ impl CacheDriver for Inmem {
/// # Errors
///
/// Returns a `CacheError` if there is an error during the operation.
async fn get(&self, key: &str) -> CacheResult<Option<String>> {
async fn get<T: DeserializeOwned>(&self, key: &str) -> CacheResult<Option<T>> {
let result = self.cache.get(key);
match result {
None => Ok(None),
Expand All @@ -74,7 +76,7 @@ impl CacheDriver for Inmem {
/// # Errors
///
/// Returns a `CacheError` if there is an error during the operation.
async fn insert(&self, key: &str, value: &str) -> CacheResult<()> {
async fn insert<T: Serialize>(&self, key: &str, value: &T) -> CacheResult<()> {
self.cache.insert(
key.to_string(),
(Expiration::Never, Arc::new(value).to_string()),
Expand All @@ -89,10 +91,10 @@ impl CacheDriver for Inmem {
///
/// Returns a [`super::CacheError`] if there is an error during the
/// operation.
async fn insert_with_expiry(
async fn insert_with_expiry<T: Serialize>(
&self,
key: &str,
value: &str,
value: &T,
duration: Duration,
) -> CacheResult<()> {
self.cache.insert(
Expand Down
12 changes: 7 additions & 5 deletions src/cache/drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
use std::time::Duration;

use async_trait::async_trait;

use serde::de::DeserializeOwned;
use serde::Serialize;
use super::CacheResult;

#[cfg(feature = "cache_inmem")]
pub mod inmem;
pub mod null;
pub mod redis;

/// Trait representing a cache driver.
#[async_trait]
Expand All @@ -28,15 +30,15 @@ pub trait CacheDriver: Sync + Send {
///
/// Returns a [`super::CacheError`] if there is an error during the
/// operation.
async fn get(&self, key: &str) -> CacheResult<Option<String>>;
async fn get<T: DeserializeOwned>(&self, key: &str) -> CacheResult<Option<T>>;

/// Inserts a key-value pair into the cache.
///
/// # Errors
///
/// Returns a [`super::CacheError`] if there is an error during the
/// operation.
async fn insert(&self, key: &str, value: &str) -> CacheResult<()>;
async fn insert<T: Serialize>(&self, key: &str, value: &T) -> CacheResult<()>;

/// Inserts a key-value pair into the cache that expires after the
/// specified duration.
Expand All @@ -45,10 +47,10 @@ pub trait CacheDriver: Sync + Send {
///
/// Returns a [`super::CacheError`] if there is an error during the
/// operation.
async fn insert_with_expiry(
async fn insert_with_expiry<T: Serialize>(
&self,
key: &str,
value: &str,
value: &T,
duration: Duration,
) -> CacheResult<()>;

Expand Down
180 changes: 180 additions & 0 deletions src/cache/drivers/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
//! # Redis Cache Driver
//!
//! This module implements a cache driver using an redis cache.
use std::{
sync::Arc,
time::{Duration, Instant},
};

use async_trait::async_trait;
use bb8::Pool;
use bb8_redis::RedisConnectionManager;
use redis::{cmd, AsyncCommands};
use serde::de::DeserializeOwned;
use serde::Serialize;
use super::CacheDriver;
use crate::cache::{CacheError, CacheResult};
use crate::config::RedisCacheConfig;

/// Creates a new instance of the in-memory cache driver, with a default Loco
/// configuration.
///
/// # Returns
///
/// A boxed [`CacheDriver`] instance.
#[must_use]
pub async fn new(config: &RedisCacheConfig) -> CacheResult<Box<dyn CacheDriver>> {
let manager = RedisConnectionManager::new(config.uri.clone())
.map_err(|e| CacheError::Any(Box::new(e)))?;
let redis = Pool::builder().build(manager).await?;

Ok(Redis::from(redis))
}

/// Represents the in-memory cache driver.
pub struct Redis {
redis: Pool<RedisConnectionManager>,
}

impl Redis {
/// Constructs a new [`Redis`] instance from a given cache.
///
/// # Returns
///
/// A boxed [`CacheDriver`] instance.
#[must_use]
pub fn from(redis: Pool<RedisConnectionManager>) -> Box<dyn CacheDriver> {
Box::new(Self { redis })
}
}

#[async_trait]
impl CacheDriver for Redis {
/// Checks if a key exists in the cache.
///
/// # Errors
///
/// Returns a `CacheError` if there is an error during the operation.
async fn contains_key(&self, key: &str) -> CacheResult<bool> {
let mut connection = self.redis.get().await?;
Ok(connection.exists(key).await?)

}

/// Retrieves a value from the cache based on the provided key.
///
/// # Errors
///
/// Returns a `CacheError` if there is an error during the operation.
async fn get<T: DeserializeOwned>(&self, key: &str) -> CacheResult<Option<T>> {
let mut connection = self.redis.get().await?;
let data: Option<Vec<u8>> = connection.get(key).await?;

match data {
Some(bytes) => {
let value = rmp_serde::from_slice(&bytes)
.map_err(|e| CacheError::Any(Box::new(e)))?;
Ok(Some(value))
}
None => Ok(None),
}
}

/// Inserts a key-value pair into the cache.
///
/// # Errors
///
/// Returns a `CacheError` if there is an error during the operation.
async fn insert<T: Serialize>(&self, key: &str, value: &T) -> CacheResult<()> {
let mut connection = self.redis.get().await?;
let encoded = rmp_serde::to_vec(value)
.map_err(|e| CacheError::Any(Box::new(e)))?;
connection.set(key, encoded).await?;
Ok(())
}

/// Inserts a key-value pair into the cache that expires after the specified
/// number of seconds.
///
/// # Errors
///
/// Returns a [`super::CacheError`] if there is an error during the
/// operation.
async fn insert_with_expiry<T: Serialize>(
&self,
key: &str,
value: &T,
duration: Duration,
) -> CacheResult<()> {
let mut connection = self.redis.get().await?;
let encoded = rmp_serde::to_vec(value)
.map_err(|e| CacheError::Any(Box::new(e)))?;
connection.set_ex(key, encoded, duration.as_secs() as usize).await?;
Ok(())
}

/// Removes a key-value pair from the cache.
///
/// # Errors
///
/// Returns a `CacheError` if there is an error during the operation.
async fn remove(&self, key: &str) -> CacheResult<()> {
let mut connection = self.redis.get().await?;
connection.del(key);
Ok(())
}

/// Clears all key-value pairs from the cache.
///
/// # Errors
///
/// Returns a `CacheError` if there is an error during the operation.
async fn clear(&self) -> CacheResult<()> {
let mut connection = self.redis.get().await?;
cmd("flushall").query(connection).await?;

Ok(())
}
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Expiration {
Never,
AfterDuration(Duration),
}

impl Expiration {
#[must_use]
pub fn as_duration(&self) -> Option<Duration> {
match self {
Self::Never => None,
Self::AfterDuration(d) => Some(*d),
}
}
}

#[cfg(test)]
mod tests {

use super::*;

#[tokio::test]
async fn is_contains_key() {
todo!()
}

#[tokio::test]
async fn can_get_key_value() {
todo!()
}

#[tokio::test]
async fn can_remove_key() {
todo!()
}

#[tokio::test]
async fn can_clear() {
todo!()
}
}
Loading
Loading