Skip to content

Commit

Permalink
refactor: adjust the code of service
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Dec 30, 2024
1 parent fba764b commit 788e86e
Show file tree
Hide file tree
Showing 21 changed files with 1,038 additions and 471 deletions.
7 changes: 4 additions & 3 deletions src/acme/lets_encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::config::{
};
use crate::http_extra::HttpResponse;
use crate::proxy::try_update_certificates;
use crate::service::Error as ServiceError;
use crate::service::SimpleServiceTaskFuture;
use crate::state::State;
use crate::util;
Expand Down Expand Up @@ -67,7 +68,7 @@ async fn update_certificate_lets_encrypt(
async fn do_update_certificates(
count: u32,
params: Vec<(String, Vec<String>)>,
) -> Result<bool, String> {
) -> Result<bool, ServiceError> {
const UPDATE_INTERVAL: u32 = 10;
if count % UPDATE_INTERVAL != 0 {
return Ok(false);
Expand All @@ -86,10 +87,10 @@ async fn do_update_certificates(
};
needs_renewal || domains_changed
},
Err(err) => {
Err(e) => {
error!(
category = LOG_CATEGORY,
error = %err,
error = %e,
name = name,
"failed to get certificate"
);
Expand Down
174 changes: 111 additions & 63 deletions src/cache/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,51 @@ pub struct FileCache {
cache: Option<TinyUfo<String, CacheObject>>,
}

/// File cache parameters
#[derive(Debug, Clone)]
struct FileCacheParams {
/// Cache directory
directory: String,
/// Max reading count
reading_max: u32,
/// Max writing count
writing_max: u32,
/// Max tinyufo cache size
cache_max: usize,
}

impl Default for FileCacheParams {
fn default() -> Self {
Self {
directory: String::new(),
reading_max: 10_000,
writing_max: 1_000,
cache_max: 100,
}
}
}

fn parse_params(dir: &str) -> FileCacheParams {
let mut reading_max = 10 * 1000;
let mut writing_max = 1000;
let mut cache_max = 100;
let dir = if let Some((dir, query)) = dir.split_once('?') {
let (dir, query) = dir.split_once('?').unwrap_or((dir, ""));
let mut params = FileCacheParams::default();

if !query.is_empty() {
let m = util::convert_query_map(query);
if let Some(max) = m.get("reading_max") {
reading_max = max.parse::<u32>().unwrap_or(reading_max);
}
if let Some(max) = m.get("writing_max") {
writing_max = max.parse::<u32>().unwrap_or(writing_max);
}
if let Some(value) = m.get("cache_max") {
cache_max = value.parse::<usize>().unwrap_or(cache_max);
}
util::resolve_path(dir)
} else {
util::resolve_path(dir)
};
FileCacheParams {
directory: dir,
reading_max,
writing_max,
cache_max,
params.reading_max = m
.get("reading_max")
.and_then(|v| v.parse().ok())
.unwrap_or(params.reading_max);
params.writing_max = m
.get("writing_max")
.and_then(|v| v.parse().ok())
.unwrap_or(params.writing_max);
params.cache_max = m
.get("cache_max")
.and_then(|v| v.parse().ok())
.unwrap_or(params.cache_max);
}
params.directory = util::resolve_path(dir);
params
}

/// Create a file cache and use tinyufo for hotspot data caching
Expand Down Expand Up @@ -112,28 +125,50 @@ pub fn new_file_cache(dir: &str) -> Result<FileCache> {
})
}

impl FileCache {
#[inline]
fn get_file_path(&self, key: &str, namespace: &str) -> std::path::PathBuf {
if namespace.is_empty() {
Path::new(&self.directory).join(key)
} else {
Path::new(&self.directory).join(format!("{namespace}/{key}"))
}
}
}

#[async_trait]
impl HttpCacheStorage for FileCache {
/// Get cache object from tinyufo,
/// if not exists, then get from the file.
/// Retrieves a cache object by key and namespace.
///
/// First checks the in-memory TinyUfo cache, then falls back to file system if not found.
/// Enforces a maximum concurrent reading limit.
///
/// # Arguments
/// * `key` - The cache key
/// * `namespace` - Optional namespace to organize cache entries
///
/// # Returns
/// * `Ok(Some(CacheObject))` - If cache entry is found and valid
/// * `Ok(None)` - If entry doesn't exist or is invalid
/// * `Err(Error::OverQuota)` - If max concurrent reads exceeded
/// * `Err(Error::Io)` - On file system errors
async fn get(
&self,
key: &str,
namespace: &str,
) -> Result<Option<CacheObject>> {
debug!(key, namespace, "get cache from file");
if let Some(Some(obj)) =
self.cache.as_ref().map(|c| c.get(&key.to_string()))
{
return Ok(Some(obj));
// Early return if found in cache
if let Some(cache) = &self.cache {
if let Some(obj) = cache.get(&key.to_string()) {
return Ok(Some(obj));
}
}

#[cfg(feature = "full")]
let start = SystemTime::now();
let file = if namespace.is_empty() {
Path::new(&self.directory).join(key)
} else {
Path::new(&self.directory).join(format!("{namespace}/{key}"))
};
let file = self.get_file_path(key, namespace);

// add reading count
let count = self.reading.fetch_add(1, Ordering::Relaxed);
defer!(self.reading.fetch_sub(1, Ordering::Relaxed););
Expand All @@ -146,23 +181,28 @@ impl HttpCacheStorage for FileCache {
let result = fs::read(file).await;
#[cfg(feature = "full")]
self.read_time.observe(util::elapsed_second(start));
let buf = match result {
Ok(buf) => Ok(buf),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Ok(vec![])
} else {
Err(Error::Io { source: e })
}

match result {
Ok(buf) if buf.len() >= 8 => {
Ok(Some(CacheObject::from(Bytes::from(buf))))
},
}?;
if buf.len() < 8 {
Ok(None)
} else {
Ok(Some(CacheObject::from(Bytes::from(buf))))
Ok(_) => Ok(None),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(Error::Io { source: e }),
}
}
/// Put cache object to tinyufo and file.
/// Stores a cache object both in TinyUfo cache and on disk.
///
/// # Arguments
/// * `key` - The cache key
/// * `namespace` - Optional namespace to organize cache entries
/// * `data` - The cache object to store
/// * `weight` - Weight hint for the TinyUfo cache
///
/// # Returns
/// * `Ok(())` - On successful storage
/// * `Err(Error::OverQuota)` - If max concurrent writes exceeded
/// * `Err(Error::Io)` - On file system errors
async fn put(
&self,
key: &str,
Expand All @@ -177,11 +217,7 @@ impl HttpCacheStorage for FileCache {
#[cfg(feature = "full")]
let start = SystemTime::now();
let buf: Bytes = data.into();
let file = if namespace.is_empty() {
Path::new(&self.directory).join(key)
} else {
Path::new(&self.directory).join(format!("{namespace}/{key}"))
};
let file = self.get_file_path(key, namespace);
// add writing count
let count = self.writing.fetch_add(1, Ordering::Relaxed);
defer!(self.writing.fetch_sub(1, Ordering::Relaxed););
Expand All @@ -196,7 +232,15 @@ impl HttpCacheStorage for FileCache {
self.write_time.observe(util::elapsed_second(start));
result.map_err(|e| Error::Io { source: e })
}
/// Remove cache object from file, tinyufo doesn't support remove now.
/// Removes a cache entry from both TinyUfo and disk storage.
///
/// # Arguments
/// * `key` - The cache key to remove
/// * `namespace` - Optional namespace of the cache entry
///
/// # Returns
/// * `Ok(None)` - Always returns None as the removed object is not returned
/// * `Err(Error::Io)` - On file system errors
async fn remove(
&self,
key: &str,
Expand All @@ -206,34 +250,38 @@ impl HttpCacheStorage for FileCache {
if let Some(c) = &self.cache {
c.remove(&key.to_string());
}
let file = if namespace.is_empty() {
Path::new(&self.directory).join(key)
} else {
Path::new(&self.directory).join(format!("{namespace}/{key}"))
};
let file = self.get_file_path(key, namespace);
fs::remove_file(file)
.await
.map_err(|e| Error::Io { source: e })?;
Ok(None)
}
/// Get the stats of file cache
/// Returns current cache statistics.
///
/// # Returns
/// Statistics including current number of concurrent reads and writes
#[inline]
fn stats(&self) -> Option<HttpCacheStats> {
Some(HttpCacheStats {
reading: self.reading.load(Ordering::Relaxed),
writing: self.writing.load(Ordering::Relaxed),
})
}
/// Clears cache entries that were last accessed before the given timestamp.
///
/// # Arguments
/// * `access_before` - Remove entries last accessed before this time
///
/// # Returns
/// * `Ok((success, fail))` - Number of successfully and unsuccessfully removed entries
async fn clear(&self, access_before: SystemTime) -> Result<(i32, i32)> {
let mut success = 0;
let mut fail = 0;
for entry in WalkDir::new(&self.directory)
.into_iter()
.filter_map(|e| e.ok())
.filter_map(|item| item.ok())
.filter(|item| !item.path().is_dir())
{
if entry.path().is_dir() {
continue;
}
let Ok(metadata) = entry.metadata() else {
continue;
};
Expand Down
Loading

0 comments on commit 788e86e

Please sign in to comment.