From d5a87cd3b644ad9f5a299068c578d87beb7f2c35 Mon Sep 17 00:00:00 2001 From: Bin Tang Date: Tue, 18 Oct 2022 15:36:53 +0800 Subject: [PATCH] storage: add mirror health checking support Currently, the mirror is set to unavailable if the failed times reach failure_limit. We added mirror health checking, which will recover unavailable mirror server. The failure_limit indicates the failed time at which the mirror is set to unavailable, and the health_check_interval indicates the time interval to recover the unavailable mirror. Signed-off-by: Bin Tang --- api/src/http.rs | 19 ++- storage/src/backend/connection.rs | 201 ++++++++++++++++-------------- 2 files changed, 127 insertions(+), 93 deletions(-) diff --git a/api/src/http.rs b/api/src/http.rs index ccc7a072135..03026bcdebc 100644 --- a/api/src/http.rs +++ b/api/src/http.rs @@ -400,18 +400,33 @@ impl Default for ProxyConfig { /// Configuration for mirror. #[derive(Clone, Deserialize, Serialize, Debug)] +#[serde(default)] pub struct MirrorConfig { /// Mirror server URL, for example http://127.0.0.1:65001. pub host: String, /// HTTP request headers to be passed to mirror server. - #[serde(default)] pub headers: HashMap, /// Whether the authorization process is through mirror? default false. /// true: authorization through mirror, e.g. Using normal registry as mirror. /// false: authorization through original registry, /// e.g. when using Dragonfly server as mirror, authorization through it will affect performance. - #[serde(default)] pub auth_through: bool, + /// Interval of mirror health checking, in seconds. + pub health_check_interval: u64, + /// Failure count for which mirror is considered unavailable. + pub failure_limit: u8, +} + +impl Default for MirrorConfig { + fn default() -> Self { + Self { + host: String::new(), + headers: HashMap::new(), + auth_through: false, + health_check_interval: 5, + failure_limit: 5, + } + } } #[derive(Debug)] diff --git a/storage/src/backend/connection.rs b/storage/src/backend/connection.rs index b81f9bc73d9..de4e13c5af7 100644 --- a/storage/src/backend/connection.rs +++ b/storage/src/backend/connection.rs @@ -12,7 +12,6 @@ use std::sync::Arc; use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use arc_swap::ArcSwapOption; use log::{max_level, Level}; use reqwest::header::{HeaderName, HeaderValue}; @@ -223,15 +222,8 @@ pub(crate) fn respond(resp: Response, catch_status: bool) -> ConnectionResult, - mirror_state: MirrorState, - shutdown: AtomicBool, -} - -#[derive(Debug)] -pub(crate) struct MirrorState { mirrors: Vec>, - /// Current mirror, None if there is no mirror available. - current: ArcSwapOption, + shutdown: AtomicBool, } #[derive(Debug)] @@ -242,7 +234,7 @@ pub(crate) struct Mirror { status: AtomicBool, /// Failed times requesting mirror, the status will be marked as false when failed_times = failure_limit. failed_times: AtomicU8, - /// Failed limit for mirror. + /// Failure count for which mirror is considered unavailable. failure_limit: u8, } @@ -294,23 +286,16 @@ impl Connection { mirrors.push(Arc::new(Mirror { config: mirror_config.clone(), status: AtomicBool::from(true), - // Maybe read from configuration file - failure_limit: 5, failed_times: AtomicU8::from(0), + failure_limit: mirror_config.failure_limit, })); } } - let current = if let Some(first_mirror) = mirrors.first() { - ArcSwapOption::from(Some(first_mirror.clone())) - } else { - ArcSwapOption::from(None) - }; - let connection = Arc::new(Connection { client, proxy, - mirror_state: MirrorState { mirrors, current }, + mirrors, shutdown: AtomicBool::new(false), }); @@ -363,7 +348,50 @@ impl Connection { }); } } - // TODO: check mirrors' health + // Check mirrors' health + for mirror in connection.mirrors.iter() { + let conn = connection.clone(); + let connect_timeout = config.connect_timeout; + // Spawn thread to update the health status of mirror server + let mirror_cloned = mirror.clone(); + thread::spawn(move || { + let mirror_health_url = format!("{}/v2", mirror_cloned.config.host); + + loop { + // Try to recover the mirror server when it is unavailable. + if !mirror_cloned.status.load(Ordering::Relaxed) { + info!( + "Mirror {} unhealthy, try to recover", + mirror_cloned.config.host + ); + let client = Client::new(); + let _ = client + .get(mirror_health_url.as_str()) + .timeout(Duration::from_secs(connect_timeout as u64)) + .send() + .map(|resp| { + // If the response status is less than StatusCode::INTERNAL_SERVER_ERROR, + // the mirror server is recovered. + if resp.status() < StatusCode::INTERNAL_SERVER_ERROR { + info!("Mirror {} recovered", mirror_cloned.config.host); + mirror_cloned.failed_times.store(0, Ordering::Relaxed); + mirror_cloned.status.store(true, Ordering::Relaxed); + } + }); + } + + if conn.shutdown.load(Ordering::Acquire) { + break; + } + thread::sleep(Duration::from_secs( + mirror_cloned.config.health_check_interval, + )); + if conn.shutdown.load(Ordering::Acquire) { + break; + } + } + }); + } Ok(connection) } @@ -449,92 +477,83 @@ impl Connection { } } - let current_mirror = self.mirror_state.current.load(); - - if let Some(mirror) = current_mirror.as_ref() { - // With configuration `auth_through` disabled, we should not intend to send authentication - // request to mirror. Mainly because mirrors like P2P/Dragonfly has a poor performance when - // relaying non-data requests. But it's still possible that ever returned token is expired. - // So mirror might still respond us with status code UNAUTHORIZED, which should be handle - // by sending authentication request to the original registry. - // - // - For non-authentication request with token in request header, handle is as usual requests to registry. - // This request should already take token in header. - // - For authentication request - // 1. auth_through is disabled(false): directly pass below mirror translations and jump to original registry handler. - // 2. auth_through is enabled(true): try to get authenticated from mirror and should also handle status code UNAUTHORIZED. - if mirror.config.auth_through - || !requesting_auth && headers.contains_key(HEADER_AUTHORIZATION) - { - let data_cloned = data.as_ref().cloned(); - - for (key, value) in mirror.config.headers.iter() { - headers.insert( - HeaderName::from_str(key).unwrap(), - HeaderValue::from_str(value).unwrap(), - ); + if !self.mirrors.is_empty() { + let mut requested_all_mirrors = true; + for mirror in self.mirrors.iter() { + // With configuration `auth_through` disabled, we should not intend to send authentication + // request to mirror. Mainly because mirrors like P2P/Dragonfly has a poor performance when + // relaying non-data requests. But it's still possible that ever returned token is expired. + // So mirror might still respond us with status code UNAUTHORIZED, which should be handle + // by sending authentication request to the original registry. + // + // - For non-authentication request with token in request header, handle is as usual requests to registry. + // This request should already take token in header. + // - For authentication request + // 1. auth_through is disabled(false): directly pass below mirror translations and jump to original registry handler. + // 2. auth_through is enabled(true): try to get authenticated from mirror and should also handle status code UNAUTHORIZED. + if !mirror.config.auth_through + && (!headers.contains_key(HEADER_AUTHORIZATION) || requesting_auth) + { + requested_all_mirrors = false; + break; } - let current_url = mirror.mirror_url(url)?; - debug!("mirror server url {}", current_url); + if mirror.status.load(Ordering::Relaxed) { + let data_cloned = data.as_ref().cloned(); - let result = self.call_inner( - &self.client, - method.clone(), - current_url.as_str(), - &query, - data_cloned, - headers, - catch_status, - false, - ); - - match result { - Ok(resp) => { - // If the response status >= INTERNAL_SERVER_ERROR, fall back to original registry. - if resp.status() < StatusCode::INTERNAL_SERVER_ERROR { - return Ok(resp); - } - } - Err(err) => { - warn!( - "request mirror server failed, mirror: {:?}, error: {:?}", - mirror, err + for (key, value) in mirror.config.headers.iter() { + headers.insert( + HeaderName::from_str(key).unwrap(), + HeaderValue::from_str(value).unwrap(), ); - mirror.failed_times.fetch_add(1, Ordering::Relaxed); + } - if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit { + let current_url = mirror.mirror_url(url)?; + debug!("mirror server url {}", current_url); + + let result = self.call_inner( + &self.client, + method.clone(), + current_url.as_str(), + &query, + data_cloned, + headers, + catch_status, + false, + ); + + match result { + Ok(resp) => { + // If the response status >= INTERNAL_SERVER_ERROR, move to the next mirror server. + if resp.status() < StatusCode::INTERNAL_SERVER_ERROR { + return Ok(resp); + } + } + Err(err) => { warn!( - "reach to failure limit {}, disable mirror: {:?}", - mirror.failure_limit, mirror + "request mirror server failed, mirror: {:?}, error: {:?}", + mirror, err ); - mirror.status.store(false, Ordering::Relaxed); - - let mut idx = 0; - loop { - if idx == self.mirror_state.mirrors.len() { - break None; - } - let m = &self.mirror_state.mirrors[idx]; - if m.status.load(Ordering::Relaxed) { - warn!("mirror server has been changed to {:?}", m); - break Some(m); - } - - idx += 1; + mirror.failed_times.fetch_add(1, Ordering::Relaxed); + + if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit { + warn!( + "reach to failure limit {}, disable mirror: {:?}", + mirror.failure_limit, mirror + ); + mirror.status.store(false, Ordering::Relaxed); } - .map(|m| self.mirror_state.current.store(Some(m.clone()))) - .unwrap_or_else(|| self.mirror_state.current.store(None)); } } } - // FIXME: Move to the next available mirror directly when mirror health checking is available. - warn!("Failed to request mirror server, fallback to original server."); - // Remove mirror-related headers to avoid sending them to the original registry. + // Remove mirror-related headers to avoid sending them to the next mirror server and original registry. for (key, _) in mirror.config.headers.iter() { headers.remove(HeaderName::from_str(key).unwrap()); } } + if requested_all_mirrors { + warn!("Request to all mirror server failed, fallback to original server."); + } } self.call_inner(