diff --git a/api/src/http.rs b/api/src/http.rs index ccc7a072135..c3dd3adfb03 100644 --- a/api/src/http.rs +++ b/api/src/http.rs @@ -400,6 +400,7 @@ impl Default for ProxyConfig { /// Configuration for mirror. #[derive(Clone, Deserialize, Serialize, Debug)] +#[serde(default)] pub struct MirrorConfig { /// Mirror server URL, for example http://127.0.0.1:65001. pub host: String, @@ -412,6 +413,28 @@ pub struct MirrorConfig { /// e.g. when using Dragonfly server as mirror, authorization through it will affect performance. #[serde(default)] pub auth_through: bool, + /// Interval of mirror health checking, in seconds. + #[serde(default = "default_check_interval")] + pub health_check_interval: u64, + /// Failure count for which mirror is considered unavailable. + #[serde(default = "default_failure_limit")] + pub failure_limit: u8, + /// Ping URL to check mirror server health. + #[serde(default)] + pub ping_url: String, +} + +impl Default for MirrorConfig { + fn default() -> Self { + Self { + host: String::new(), + headers: HashMap::new(), + auth_through: false, + health_check_interval: 5, + failure_limit: 5, + ping_url: String::new(), + } + } } #[derive(Debug)] @@ -623,6 +646,14 @@ fn default_http_timeout() -> u32 { 5 } +fn default_check_interval() -> u64 { + 5 +} + +fn default_failure_limit() -> u8 { + 5 +} + fn default_work_dir() -> String { ".".to_string() } diff --git a/docs/nydusd.md b/docs/nydusd.md index ea24859f564..4ed02c61ab6 100644 --- a/docs/nydusd.md +++ b/docs/nydusd.md @@ -265,13 +265,21 @@ Currently, the mirror mode is only tested in the registry backend, and in theory // When Dragonfly does not cache data, nydusd will pull it from "X-Dragonfly-Registry". // If not set "X-Dragonfly-Registry", Dragonfly will pull data from proxy.registryMirror.url. "X-Dragonfly-Registry": "https://index.docker.io" - } + }, + // This URL endpoint is used to check the health of mirror server, and if the mirror is unhealthy, + // the request will fallback to the next mirror or the original registry server. + // Use $host/v2 as default if left empty. + "ping_url": "http://127.0.0.1:40901/server/ping", + // Interval time (s) to check and recover unavailable mirror. Use 5 as default if left empty. + "health_check_interval": 5, + // Failure counts before disabling this mirror. Use 5 as default if left empty. + "failure_limit": 5, }, { "host": "http://dragonfly2.io:65001", "headers": { "X-Dragonfly-Registry": "https://index.docker.io" - } + }, } ], ... diff --git a/storage/src/backend/connection.rs b/storage/src/backend/connection.rs index b81f9bc73d9..8ea2b31d24a 100644 --- a/storage/src/backend/connection.rs +++ b/storage/src/backend/connection.rs @@ -12,7 +12,6 @@ use std::sync::Arc; use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use arc_swap::ArcSwapOption; use log::{max_level, Level}; use reqwest::header::{HeaderName, HeaderValue}; @@ -222,27 +221,20 @@ pub(crate) fn respond(resp: Response, catch_status: bool) -> ConnectionResult, - mirror_state: MirrorState, - shutdown: AtomicBool, -} - -#[derive(Debug)] -pub(crate) struct MirrorState { - mirrors: Vec>, - /// Current mirror, None if there is no mirror available. - current: ArcSwapOption, + proxy: Option>, + pub mirrors: Vec>, + pub shutdown: AtomicBool, } #[derive(Debug)] pub(crate) struct Mirror { /// Information for mirror from configuration file. - config: MirrorConfig, + pub config: MirrorConfig, /// Mirror status, it will be set to false by atomic operation when mirror is not work. status: AtomicBool, /// Failed times requesting mirror, the status will be marked as false when failed_times = failure_limit. failed_times: AtomicU8, - /// Failed limit for mirror. + /// Failure count for which mirror is considered unavailable. failure_limit: u8, } @@ -277,13 +269,13 @@ impl Connection { } else { None }; - Some(Proxy { + Some(Arc::new(Proxy { client: Self::build_connection(&config.proxy.url, config)?, health: ProxyHealth::new(config.proxy.check_interval, ping_url), fallback: config.proxy.fallback, use_http: config.proxy.use_http, replace_scheme: AtomicI16::new(SCHEME_REVERSION_CACHE_UNSET), - }) + })) } else { None }; @@ -294,34 +286,34 @@ impl Connection { mirrors.push(Arc::new(Mirror { config: mirror_config.clone(), status: AtomicBool::from(true), - // Maybe read from configuration file - failure_limit: 5, failed_times: AtomicU8::from(0), + failure_limit: mirror_config.failure_limit, })); } } - let current = if let Some(first_mirror) = mirrors.first() { - ArcSwapOption::from(Some(first_mirror.clone())) - } else { - ArcSwapOption::from(None) - }; - let connection = Arc::new(Connection { client, proxy, - mirror_state: MirrorState { mirrors, current }, + mirrors, shutdown: AtomicBool::new(false), }); - if let Some(proxy) = &connection.proxy { - if proxy.health.ping_url.is_some() { - let conn = connection.clone(); - let connect_timeout = config.connect_timeout; + // Start proxy's health checking thread. + connection.start_proxy_health_thread(config.connect_timeout as u64); + // Start mirrors' health checking thread. + connection.start_mirrors_health_thread(config.timeout as u64); + + Ok(connection) + } + + fn start_proxy_health_thread(&self, connect_timeout: u64) { + if let Some(proxy) = self.proxy.as_ref() { + if proxy.health.ping_url.is_some() { + let proxy = proxy.clone(); // Spawn thread to update the health status of proxy server thread::spawn(move || { - let proxy = conn.proxy.as_ref().unwrap(); let ping_url = proxy.health.ping_url.as_ref().unwrap(); let mut last_success = true; @@ -352,20 +344,60 @@ impl Connection { proxy.health.set(false) }); - if conn.shutdown.load(Ordering::Acquire) { - break; - } thread::sleep(proxy.health.check_interval); - if conn.shutdown.load(Ordering::Acquire) { - break; - } } }); } } - // TODO: check mirrors' health + } - Ok(connection) + fn start_mirrors_health_thread(&self, timeout: u64) { + for mirror in self.mirrors.iter() { + let mirror_cloned = mirror.clone(); + thread::spawn(move || { + let mirror_health_url = if mirror_cloned.config.ping_url.is_empty() { + format!("{}/v2", mirror_cloned.config.host) + } else { + mirror_cloned.config.ping_url.clone() + }; + info!("Mirror health checking url: {}", mirror_health_url); + + let client = Client::new(); + loop { + // Try to recover the mirror server when it is unavailable. + if !mirror_cloned.status.load(Ordering::Relaxed) { + info!( + "Mirror server {} unhealthy, try to recover", + mirror_cloned.config.host + ); + + let _ = client + .get(mirror_health_url.as_str()) + .timeout(Duration::from_secs(timeout as u64)) + .send() + .map(|resp| { + // If the response status is less than StatusCode::INTERNAL_SERVER_ERROR, + // the mirror server is recovered. + if resp.status() < StatusCode::INTERNAL_SERVER_ERROR { + info!("Mirror server {} recovered", mirror_cloned.config.host); + mirror_cloned.failed_times.store(0, Ordering::Relaxed); + mirror_cloned.status.store(true, Ordering::Relaxed); + } + }) + .map_err(|e| { + warn!( + "Mirror server {} is not recovered: {}", + mirror_cloned.config.host, e + ); + }); + } + + thread::sleep(Duration::from_secs( + mirror_cloned.config.health_check_interval, + )); + } + }); + } } /// Shutdown the connection. @@ -449,92 +481,83 @@ impl Connection { } } - let current_mirror = self.mirror_state.current.load(); - - if let Some(mirror) = current_mirror.as_ref() { - // With configuration `auth_through` disabled, we should not intend to send authentication - // request to mirror. Mainly because mirrors like P2P/Dragonfly has a poor performance when - // relaying non-data requests. But it's still possible that ever returned token is expired. - // So mirror might still respond us with status code UNAUTHORIZED, which should be handle - // by sending authentication request to the original registry. - // - // - For non-authentication request with token in request header, handle is as usual requests to registry. - // This request should already take token in header. - // - For authentication request - // 1. auth_through is disabled(false): directly pass below mirror translations and jump to original registry handler. - // 2. auth_through is enabled(true): try to get authenticated from mirror and should also handle status code UNAUTHORIZED. - if mirror.config.auth_through - || !requesting_auth && headers.contains_key(HEADER_AUTHORIZATION) - { - let data_cloned = data.as_ref().cloned(); - - for (key, value) in mirror.config.headers.iter() { - headers.insert( - HeaderName::from_str(key).unwrap(), - HeaderValue::from_str(value).unwrap(), - ); + if !self.mirrors.is_empty() { + let mut fallback_due_auth = false; + for mirror in self.mirrors.iter() { + // With configuration `auth_through` disabled, we should not intend to send authentication + // request to mirror. Mainly because mirrors like P2P/Dragonfly has a poor performance when + // relaying non-data requests. But it's still possible that ever returned token is expired. + // So mirror might still respond us with status code UNAUTHORIZED, which should be handle + // by sending authentication request to the original registry. + // + // - For non-authentication request with token in request header, handle is as usual requests to registry. + // This request should already take token in header. + // - For authentication request + // 1. auth_through is disabled(false): directly pass below mirror translations and jump to original registry handler. + // 2. auth_through is enabled(true): try to get authenticated from mirror and should also handle status code UNAUTHORIZED. + if !mirror.config.auth_through + && (!headers.contains_key(HEADER_AUTHORIZATION) || requesting_auth) + { + fallback_due_auth = true; + break; } - let current_url = mirror.mirror_url(url)?; - debug!("mirror server url {}", current_url); - - let result = self.call_inner( - &self.client, - method.clone(), - current_url.as_str(), - &query, - data_cloned, - headers, - catch_status, - false, - ); + if mirror.status.load(Ordering::Relaxed) { + let data_cloned = data.as_ref().cloned(); - match result { - Ok(resp) => { - // If the response status >= INTERNAL_SERVER_ERROR, fall back to original registry. - if resp.status() < StatusCode::INTERNAL_SERVER_ERROR { - return Ok(resp); - } - } - Err(err) => { - warn!( - "request mirror server failed, mirror: {:?}, error: {:?}", - mirror, err + for (key, value) in mirror.config.headers.iter() { + headers.insert( + HeaderName::from_str(key).unwrap(), + HeaderValue::from_str(value).unwrap(), ); - mirror.failed_times.fetch_add(1, Ordering::Relaxed); + } + + let current_url = mirror.mirror_url(url)?; + debug!("mirror server url {}", current_url); + + let result = self.call_inner( + &self.client, + method.clone(), + current_url.as_str(), + &query, + data_cloned, + headers, + catch_status, + false, + ); - if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit { + match result { + Ok(resp) => { + // If the response status >= INTERNAL_SERVER_ERROR, move to the next mirror server. + if resp.status() < StatusCode::INTERNAL_SERVER_ERROR { + return Ok(resp); + } + } + Err(err) => { warn!( - "reach to failure limit {}, disable mirror: {:?}", - mirror.failure_limit, mirror + "request mirror server failed, mirror: {:?}, error: {:?}", + mirror, err ); - mirror.status.store(false, Ordering::Relaxed); - - let mut idx = 0; - loop { - if idx == self.mirror_state.mirrors.len() { - break None; - } - let m = &self.mirror_state.mirrors[idx]; - if m.status.load(Ordering::Relaxed) { - warn!("mirror server has been changed to {:?}", m); - break Some(m); - } - - idx += 1; + mirror.failed_times.fetch_add(1, Ordering::Relaxed); + + if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit { + warn!( + "reach to failure limit {}, disable mirror: {:?}", + mirror.failure_limit, mirror + ); + mirror.status.store(false, Ordering::Relaxed); } - .map(|m| self.mirror_state.current.store(Some(m.clone()))) - .unwrap_or_else(|| self.mirror_state.current.store(None)); } } } - // FIXME: Move to the next available mirror directly when mirror health checking is available. - warn!("Failed to request mirror server, fallback to original server."); - // Remove mirror-related headers to avoid sending them to the original registry. + // Remove mirror-related headers to avoid sending them to the next mirror server and original registry. for (key, _) in mirror.config.headers.iter() { headers.remove(HeaderName::from_str(key).unwrap()); } } + if !fallback_due_auth { + warn!("Request to all mirror server failed, fallback to original server."); + } } self.call_inner( diff --git a/storage/src/backend/registry.rs b/storage/src/backend/registry.rs index 94e64ae2365..47835e6ac8e 100644 --- a/storage/src/backend/registry.rs +++ b/storage/src/backend/registry.rs @@ -5,8 +5,12 @@ //! Storage backend driver to access blobs on container image registry. use std::collections::HashMap; use std::io::{Error, Read, Result}; +use std::sync::atomic::Ordering; use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use arc_swap::ArcSwapOption; use reqwest::blocking::Response; pub use reqwest::header::HeaderMap; use reqwest::header::{HeaderValue, CONTENT_LENGTH}; @@ -102,6 +106,12 @@ impl HashCache { #[derive(Clone, serde::Deserialize)] struct TokenResponse { token: String, + #[serde(default = "default_expires_in")] + expires_in: u64, +} + +fn default_expires_in() -> u64 { + 10 * 60 } #[derive(Debug)] @@ -110,7 +120,7 @@ struct BasicAuth { realm: String, } -#[derive(Debug)] +#[derive(Debug, Clone)] struct BearerAuth { realm: String, service: String, @@ -149,6 +159,11 @@ struct RegistryState { // Cache 30X redirect url // Example: RwLock", "">> cached_redirect: HashCache, + + // The expiration time of the token, which is obtained from the registry server. + refresh_token_time: ArcSwapOption, + // Cache bearer auth for refreshing token. + cached_bearer_auth: ArcSwapOption, } impl RegistryState { @@ -207,6 +222,19 @@ impl RegistryState { e )) })?; + if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) { + self.refresh_token_time + .store(Some(Arc::new(now_timestamp.as_secs() + ret.expires_in))); + info!( + "cached_bearer_auth: {:?}, next time: {}", + auth, + now_timestamp.as_secs() + ret.expires_in + ); + } + + // Cache bearer auth for refreshing token. + self.cached_bearer_auth.store(Some(Arc::new(auth))); + Ok(ret.token) } @@ -605,13 +633,27 @@ impl Registry { blob_url_scheme: config.blob_url_scheme, blob_redirected_host: config.blob_redirected_host, cached_redirect: HashCache::new(), + refresh_token_time: ArcSwapOption::new(None), + cached_bearer_auth: ArcSwapOption::new(None), }); - Ok(Registry { + let mirrors = connection.mirrors.clone(); + + let registry = Registry { connection, state, metrics: BackendMetrics::new(id, "registry"), - }) + }; + + for mirror in mirrors.iter() { + if !mirror.config.auth_through { + registry.start_refresh_token_thread(); + info!("Refresh token thread started."); + break; + } + } + + Ok(registry) } fn get_authorization_info(auth: &Option) -> Result<(String, String)> { @@ -638,6 +680,72 @@ impl Registry { Ok((String::new(), String::new())) } } + + fn start_refresh_token_thread(&self) { + let conn = self.connection.clone(); + let state = self.state.clone(); + // The default refresh token internal is 10 minutes. + let refresh_check_internal = 10 * 60; + thread::spawn(move || { + loop { + if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) { + if let Some(next_refresh_timestamp) = state.refresh_token_time.load().as_deref() + { + // If the token will expire in next refresh check internal, get new token now. + // Add 20 seconds to handle critical cases. + if now_timestamp.as_secs() + refresh_check_internal + 20 + >= *next_refresh_timestamp + { + if let Some(cached_bearer_auth) = + state.cached_bearer_auth.load().as_deref() + { + let mut cached_bearer_auth_clone = cached_bearer_auth.clone(); + if let Ok(url) = Url::parse(&cached_bearer_auth_clone.realm) { + let last_cached_auth = state.cached_auth.get(); + + let query: Vec<(_, _)> = + url.query_pairs().filter(|p| p.0 != "grant_type").collect(); + + let mut refresh_url = url.clone(); + refresh_url.set_query(None); + + for pair in query { + refresh_url.query_pairs_mut().append_pair( + &pair.0.to_string()[..], + &pair.1.to_string()[..], + ); + } + refresh_url + .query_pairs_mut() + .append_pair("grant_type", "refresh_token"); + cached_bearer_auth_clone.realm = refresh_url.to_string(); + + let token = state.get_token(cached_bearer_auth_clone, &conn); + + if let Ok(token) = token { + let new_cached_auth = format!("Bearer {}", token); + info!( + "Authorization token for registry has been refreshed." + ); + // Refresh authorization token + state.cached_auth.set(&last_cached_auth, new_cached_auth); + } + } + } + } + } + } + + if conn.shutdown.load(Ordering::Acquire) { + break; + } + thread::sleep(Duration::from_secs(refresh_check_internal)); + if conn.shutdown.load(Ordering::Acquire) { + break; + } + } + }); + } } impl BlobBackend for Registry { @@ -723,6 +831,8 @@ mod tests { blob_redirected_host: "oss.alibaba-inc.com".to_string(), cached_auth: Default::default(), cached_redirect: Default::default(), + refresh_token_time: ArcSwapOption::new(None), + cached_bearer_auth: ArcSwapOption::new(None), }; assert_eq!(