Skip to content

Commit

Permalink
refactor: update upstream health check
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Jan 4, 2025
1 parent fc3414e commit 7fa172c
Showing 1 changed file with 47 additions and 35 deletions.
82 changes: 47 additions & 35 deletions src/proxy/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use async_trait::async_trait;
use derive_more::Debug;
use futures_util::FutureExt;
use once_cell::sync::Lazy;
use pingora::lb::selection::{Consistent, RoundRobin};
use pingora::lb::selection::{
BackendIter, BackendSelection, Consistent, RoundRobin,
};
use pingora::lb::{Backends, LoadBalancer};
use pingora::protocols::l4::ext::TcpKeepalive;
use pingora::protocols::ALPN;
Expand Down Expand Up @@ -223,6 +225,38 @@ fn get_hash_value(
}
}

fn update_health_check_params<S>(
mut lb: LoadBalancer<S>,
name: &str,
conf: &UpstreamConf,
) -> Result<LoadBalancer<S>>
where
S: BackendSelection + 'static,
S::Iter: BackendIter,
{
// For static discovery, perform immediate backend update
if is_static_discovery(&conf.guess_discovery()) {
lb.update()
.now_or_never()
.expect("static should not block")
.expect("static should not error");
}

// Set up health checking for the backends
let (hc, health_check_frequency) =
new_health_check(name, &conf.health_check.clone().unwrap_or_default())
.map_err(|e| Error::Common {
message: e.to_string(),
category: "health".to_string(),
})?;
// Configure health checking
lb.parallel_health_check = true;
lb.set_health_check(hc);
lb.update_frequency = conf.update_frequency;
lb.health_check_frequency = Some(health_check_frequency);
Ok(lb)
}

/// Creates a new load balancer instance based on the provided configuration
///
/// # Arguments
Expand Down Expand Up @@ -266,14 +300,6 @@ fn new_load_balancer(
discovery.as_str(),
)?;

// Set up health checking for the backends
let (hc, health_check_frequency) =
new_health_check(name, &conf.health_check.clone().unwrap_or_default())
.map_err(|e| Error::Common {
message: e.to_string(),
category: "health".to_string(),
})?;

// Parse the load balancing algorithm configuration
// Format: "algo:hash_type:hash_key" (e.g. "hash:cookie:session_id")
let algo_method = conf.algo.clone().unwrap_or_default();
Expand All @@ -284,43 +310,29 @@ fn new_load_balancer(
let lb = match algo_params[0] {
// Consistent hashing load balancer
"hash" => {
let mut lb = LoadBalancer::<Consistent>::from_backends(backends);
// Parse hash type and key if provided
if algo_params.len() > 1 {
hash = algo_params[1].to_string();
if algo_params.len() > 2 {
hash_key = algo_params[2].to_string();
}
}
// For static discovery, perform immediate backend update
if is_static_discovery(&discovery) {
lb.update()
.now_or_never()
.expect("static should not block")
.expect("static should not error");
}
// Configure health checking
lb.parallel_health_check = true;
lb.set_health_check(hc);
lb.update_frequency = conf.update_frequency;
lb.health_check_frequency = Some(health_check_frequency);
let lb = update_health_check_params(
LoadBalancer::<Consistent>::from_backends(backends),
name,
conf,
)?;

SelectionLb::Consistent(Arc::new(lb))
},
// Round robin load balancer (default)
_ => {
let mut lb = LoadBalancer::<RoundRobin>::from_backends(backends);
// For static discovery, perform immediate backend update
if is_static_discovery(&discovery) {
lb.update()
.now_or_never()
.expect("static should not block")
.expect("static should not error");
}
// Configure health checking
lb.parallel_health_check = true;
lb.set_health_check(hc);
lb.update_frequency = conf.update_frequency;
lb.health_check_frequency = Some(health_check_frequency);
let lb = update_health_check_params(
LoadBalancer::<RoundRobin>::from_backends(backends),
name,
conf,
)?;

SelectionLb::RoundRobin(Arc::new(lb))
},
};
Expand Down

0 comments on commit 7fa172c

Please sign in to comment.