Skip to content

Commit

Permalink
docs: add comments to service
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Jan 11, 2025
1 parent 15bb6be commit 2c2440d
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 19 deletions.
47 changes: 46 additions & 1 deletion src/service/auto_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ use std::time::Duration;
use tokio::time::interval;
use tracing::{debug, error, info};

/// Compares configurations and handles updates through hot reload or full restart
///
/// This function:
/// 1. Loads and validates the new configuration
/// 2. Compares it with current config to find differences
/// 3. Attempts hot reload for supported changes:
/// - Server locations
/// - Upstream configurations
/// - Location definitions
/// - Plugin configurations
/// - Certificates (except ACME/Let's Encrypt)
/// 4. Sends notifications for successful updates
/// 5. If hot_reload_only=false and there are non-hot-reloadable changes,
/// triggers a full server restart
async fn diff_and_update_config(
hot_reload_only: bool,
) -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -317,13 +331,24 @@ async fn diff_and_update_config(
Ok(())
}

/// AutoRestart service manages configuration updates on a schedule
///
/// The service alternates between hot reloads and full restarts based on:
/// - restart_unit: Determines frequency of full restarts vs hot reloads
/// - only_hot_reload: Forces hot reload only mode
/// - count: Tracks intervals to coordinate restart timing
struct AutoRestart {
/// How many intervals to wait before allowing a full restart (vs hot reload)
restart_unit: u32,
/// If true, only perform hot reloads and never restart
only_hot_reload: bool,
/// Tracks if currently performing a hot reload
running_hot_reload: AtomicBool,
/// Counter for tracking intervals
count: AtomicU32,
}

/// Creates a new auto-restart service that checks for config changes periodically
pub fn new_auto_restart_service(
interval: Duration,
only_hot_reload: bool,
Expand All @@ -345,8 +370,22 @@ pub fn new_auto_restart_service(
)
}

/// ConfigObserverService provides real-time config file monitoring
///
/// This service:
/// 1. Watches the config file/storage for changes
/// 2. Triggers immediate hot reload when changes detected
/// 3. Can optionally perform full restarts if needed
/// 4. Runs continuously until server shutdown
///
/// The service uses tokio::select! to handle:
/// - Periodic checks (interval-based)
/// - File system events (real-time)
/// - Graceful shutdown signals
pub struct ConfigObserverService {
/// How often to check for changes
interval: Duration,
/// If true, only perform hot reloads when changes detected
only_hot_reload: bool,
}

Expand Down Expand Up @@ -415,6 +454,8 @@ impl BackgroundService for ConfigObserverService {
}
}

/// Helper function to run the config diff and update process
/// Logs any errors that occur during the update
async fn run_diff_and_update_config(hot_reload_only: bool) {
if let Err(e) = diff_and_update_config(hot_reload_only).await {
error!(
Expand All @@ -429,11 +470,15 @@ async fn run_diff_and_update_config(hot_reload_only: bool) {
#[async_trait]
impl ServiceTask for AutoRestart {
async fn run(&self) -> Option<bool> {
// Calculate if this iteration should be hot reload only
// Uses modulo arithmetic with restart_unit to create a pattern like:
// [hot reload, hot reload, full restart, hot reload, hot reload, full restart]
// This helps spread out potentially disruptive full restarts
let count = self.count.fetch_add(1, Ordering::Relaxed);
let hot_reload_only = if self.only_hot_reload {
true
} else if count > 0 && self.restart_unit > 1 {
count % self.restart_unit != 0
count % self.restart_unit != 0 // Only do full restart when count divides evenly
} else {
true
};
Expand Down
140 changes: 134 additions & 6 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,44 @@ pub enum Error {
Invalid { message: String },
}

// Type alias for a boxed future that represents a background task
// Takes a u32 counter and returns Result<bool, Error>
pub type SimpleServiceTaskFuture =
Box<dyn Fn(u32) -> BoxFuture<'static, Result<bool, Error>> + Sync + Send>;

// Represents a collection of background tasks that run periodically
pub struct SimpleServiceTask {
name: String,
count: AtomicU32,
tasks: Vec<(String, SimpleServiceTaskFuture)>,
interval: Duration,
name: String, // Name identifier for the service
count: AtomicU32, // Counter for tracking task executions
tasks: Vec<(String, SimpleServiceTaskFuture)>, // List of named tasks to execute
interval: Duration, // Time between task executions
}

/// Creates a new SimpleServiceTask with the specified name, interval, and collection of tasks.
/// This service manages multiple background tasks that run concurrently at fixed intervals.
///
/// # Arguments
/// * `name` - Identifier for this service instance, used in logging
/// * `interval` - Duration between task executions (e.g., Duration::from_secs(60) for minute intervals)
/// * `tasks` - Vector of named tasks to execute periodically, where each task is a tuple of (name, task_function)
///
/// # Examples
/// ```
/// use std::time::Duration;
///
/// let tasks = vec![
/// ("cleanup".to_string(), Box::new(|count| Box::pin(async move {
/// // Perform cleanup operation
/// Ok(true)
/// })))
/// ];
///
/// let service = new_simple_service_task(
/// "maintenance",
/// Duration::from_secs(300), // Run every 5 minutes
/// tasks
/// );
/// ```
pub fn new_simple_service_task(
name: &str,
interval: Duration,
Expand All @@ -54,6 +82,22 @@ pub fn new_simple_service_task(

#[async_trait]
impl BackgroundService for SimpleServiceTask {
/// Starts the background service, executing all tasks at the specified interval
/// until shutdown is signaled or tasks complete. Each task execution is logged
/// with timing information and success/failure status.
///
/// # Arguments
/// * `shutdown` - Watch channel for shutdown coordination
///
/// # Task Execution
/// - Tasks are executed sequentially in the order they were added
/// - Each task receives a counter value that increments with each interval
/// - Failed tasks are logged with error details but don't stop the service
/// - Task execution times are logged for monitoring purposes
///
/// # Shutdown Behavior
/// - Service stops gracefully when shutdown signal is received
/// - Current task iteration completes before shutdown
async fn start(&self, mut shutdown: ShutdownWatch) {
let period_human: humantime::Duration = self.interval.into();
let task_names: Vec<String> =
Expand All @@ -69,14 +113,17 @@ impl BackgroundService for SimpleServiceTask {
let mut period = interval(self.interval);
loop {
tokio::select! {
// Handle shutdown signal
_ = shutdown.changed() => {
break;
}
// Execute tasks on each interval tick
_ = period.tick() => {
let now = SystemTime::now();
let count = self.count.fetch_add(1, Ordering::Relaxed);
let mut success_tasks = vec![];
let mut fail_tasks = vec![];
// Execute each task and track results
for (task_name, task) in self.tasks.iter() {
let task_start = SystemTime::now();
match task(count).await {
Expand Down Expand Up @@ -122,20 +169,84 @@ impl BackgroundService for SimpleServiceTask {
}
}

// Trait defining interface for individual service tasks
#[async_trait]
pub trait ServiceTask: Sync + Send {
/// Executes a single iteration of the task. This method is called repeatedly
/// at the specified interval until shutdown or task completion.
///
/// # Returns
/// * `None` or `Some(false)` - Task completed normally, continue running the service
/// * `Some(true)` - Task completed and requests service shutdown
///
/// # Examples
/// ```
/// #[async_trait]
/// impl ServiceTask for MyTask {
/// async fn run(&self) -> Option<bool> {
/// // Perform task work
/// if self.work_complete() {
/// Some(true) // Stop service
/// } else {
/// Some(false) // Continue running
/// }
/// }
/// }
/// ```
async fn run(&self) -> Option<bool>;

/// Returns a human-readable description of the task for logging and monitoring.
/// Implementations should provide meaningful descriptions of their purpose.
///
/// # Returns
/// * String describing the task's purpose, default is "unknown"
///
/// # Examples
/// ```
/// fn description(&self) -> String {
/// "Database cleanup task - removes expired records".to_string()
/// }
/// ```
fn description(&self) -> String {
"unknown".to_string()
}
}

// Wrapper for individual ServiceTask implementations
pub struct CommonServiceTask {
task: Box<dyn ServiceTask>,
interval: Duration,
task: Box<dyn ServiceTask>, // The actual task to execute
interval: Duration, // Time between executions
}

impl CommonServiceTask {
/// Creates a new CommonServiceTask that wraps a single task implementation.
/// This is useful for simpler cases where only one recurring task is needed.
///
/// # Arguments
/// * `interval` - Duration between task executions
/// * `task` - Implementation of ServiceTask to execute
///
/// # Special Cases
/// - If interval is less than 1 second, task runs only once
/// - Task can signal completion via return value to stop service
///
/// # Examples
/// ```
/// struct HealthCheck;
///
/// #[async_trait]
/// impl ServiceTask for HealthCheck {
/// async fn run(&self) -> Option<bool> {
/// // Perform health check
/// Some(false)
/// }
/// }
///
/// let service = CommonServiceTask::new(
/// Duration::from_secs(60),
/// HealthCheck
/// );
/// ```
pub fn new(interval: Duration, task: impl ServiceTask + 'static) -> Self {
Self {
task: Box::new(task),
Expand All @@ -146,6 +257,23 @@ impl CommonServiceTask {

#[async_trait]
impl BackgroundService for CommonServiceTask {
/// Starts the background service, executing the wrapped task at the specified interval.
/// The service runs until one of the following conditions is met:
/// - Shutdown signal is received
/// - Task returns Some(true) indicating completion
/// - Interval is less than 1 second (runs once and stops)
///
/// # Arguments
/// * `shutdown` - Watch channel for shutdown coordination
///
/// # Logging
/// - Service start is logged with task description and interval
/// - Each task execution is logged with elapsed time
/// - Task completion status is logged
///
/// # Performance Considerations
/// - Task execution time is measured and logged
/// - Long-running tasks may delay the next interval
async fn start(&self, mut shutdown: ShutdownWatch) {
let period_human: humantime::Duration = self.interval.into();
// if interval is less than 1s
Expand Down
38 changes: 26 additions & 12 deletions src/service/performance_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ use crate::service::SimpleServiceTaskFuture;
use crate::state::{get_process_system_info, get_processing_accepted};
use tracing::info;

// Service name constant for performance metrics logging
static PERFORMANCE_METRICS_LOG_SERVICE: &str = "performanceMetricsLog";

/// Creates a new service that periodically logs performance metrics
/// Returns a tuple of (service name, service task)
pub fn new_performance_metrics_log_service() -> (String, SimpleServiceTaskFuture)
{
let task: SimpleServiceTaskFuture = Box::new(move |_count: u32| {
Box::pin({
async move {
// Get cache statistics (reading/writing counts)
let mut cache_reading: i64 = -1;
let mut cache_writing: i64 = -1;
if let Ok(cache) = get_cache_backend() {
Expand All @@ -35,6 +39,9 @@ pub fn new_performance_metrics_log_service() -> (String, SimpleServiceTaskFuture
cache_writing = stats.writing as i64;
}
}

// Collect active location processing counts
// Format: "location1:count1, location2:count2, ..."
let locations_processing = get_locations_processing()
.into_iter()
.filter(|(_, count)| *count != 0)
Expand All @@ -46,15 +53,19 @@ pub fn new_performance_metrics_log_service() -> (String, SimpleServiceTaskFuture
} else {
Some(locations_processing)
};

// Collect upstream processing and connection counts
let mut upstreams_processing = vec![];
let mut upstreams_connected = vec![];
for (name, (processing, connected)) in
get_upstreams_processing_connected()
{
// Track non-zero processing counts
if processing != 0 {
upstreams_processing
.push(format!("{name}:{processing}"));
}
// Track non-zero connection counts
if let Some(connected) = connected {
if connected != 0 {
upstreams_connected
Expand All @@ -73,22 +84,25 @@ pub fn new_performance_metrics_log_service() -> (String, SimpleServiceTaskFuture
Some(upstreams_connected.join(", "))
};

// Get system metrics and request processing stats
let system_info = get_process_system_info();
let (processing, accepted) = get_processing_accepted();

// Log all metrics using the tracing framework
info!(
category = PERFORMANCE_METRICS_LOG_SERVICE,
threads = system_info.threads,
locations_processing,
upstreams_processing,
upstreams_connected,
accepted,
processing,
used_memory = system_info.memory,
fd_count = system_info.fd_count,
tcp_count = system_info.tcp_count,
tcp6_count = system_info.tcp6_count,
cache_reading,
cache_writing,
threads = system_info.threads, // Number of threads
locations_processing, // Active location requests
upstreams_processing, // Active upstream requests
upstreams_connected, // Active upstream connections
accepted, // Total accepted requests
processing, // Currently processing requests
used_memory = system_info.memory, // Memory usage
fd_count = system_info.fd_count, // File descriptor count
tcp_count = system_info.tcp_count, // IPv4 TCP connection count
tcp6_count = system_info.tcp6_count, // IPv6 TCP connection count
cache_reading, // Active cache reads
cache_writing, // Active cache writes
"performance metrics"
);
Ok(true)
Expand Down

0 comments on commit 2c2440d

Please sign in to comment.