Skip to content

Commit

Permalink
storage: add mirror health checking support
Browse files Browse the repository at this point in the history
Currently, the mirror is set to unavailable if the failed times reach failure_limit.
We added mirror health checking, which will recover unavailable mirror server.
The failure_limit indicates the failed time at which the mirror is set to unavailable,
and the health_check_interval indicates the time interval to recover the unavailable mirror.
Moreover, we avoid forwarding 401 responses to the mirror
when there exists a mirror that auth_through is false.

Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
  • Loading branch information
sctb512 committed Oct 29, 2022
1 parent 9d164c4 commit 8593a35
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 96 deletions.
27 changes: 27 additions & 0 deletions api/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ impl Default for ProxyConfig {

/// Configuration for mirror.
#[derive(Clone, Deserialize, Serialize, Debug)]
#[serde(default)]
pub struct MirrorConfig {
/// Mirror server URL, for example http://127.0.0.1:65001.
pub host: String,
Expand All @@ -412,6 +413,24 @@ pub struct MirrorConfig {
/// e.g. when using Dragonfly server as mirror, authorization through it will affect performance.
#[serde(default)]
pub auth_through: bool,
/// Interval of mirror health checking, in seconds.
#[serde(default = "default_check_interval")]
pub health_check_interval: u64,
/// Failure count for which mirror is considered unavailable.
#[serde(default = "default_failure_limit")]
pub failure_limit: u8,
}

impl Default for MirrorConfig {
fn default() -> Self {
Self {
host: String::new(),
headers: HashMap::new(),
auth_through: false,
health_check_interval: 5,
failure_limit: 5,
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -623,6 +642,14 @@ fn default_http_timeout() -> u32 {
5
}

fn default_check_interval() -> u64 {
5
}

fn default_failure_limit() -> u8 {
5
}

fn default_work_dir() -> String {
".".to_string()
}
Expand Down
205 changes: 112 additions & 93 deletions storage/src/backend/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use arc_swap::ArcSwapOption;
use log::{max_level, Level};

use reqwest::header::{HeaderName, HeaderValue};
Expand Down Expand Up @@ -223,26 +222,19 @@ pub(crate) fn respond(resp: Response, catch_status: bool) -> ConnectionResult<Re
pub(crate) struct Connection {
client: Client,
proxy: Option<Proxy>,
mirror_state: MirrorState,
shutdown: AtomicBool,
}

#[derive(Debug)]
pub(crate) struct MirrorState {
mirrors: Vec<Arc<Mirror>>,
/// Current mirror, None if there is no mirror available.
current: ArcSwapOption<Mirror>,
pub mirrors: Vec<Arc<Mirror>>,
pub shutdown: AtomicBool,
}

#[derive(Debug)]
pub(crate) struct Mirror {
/// Information for mirror from configuration file.
config: MirrorConfig,
pub config: MirrorConfig,
/// Mirror status, it will be set to false by atomic operation when mirror is not work.
status: AtomicBool,
/// Failed times requesting mirror, the status will be marked as false when failed_times = failure_limit.
failed_times: AtomicU8,
/// Failed limit for mirror.
/// Failure count for which mirror is considered unavailable.
failure_limit: u8,
}

Expand Down Expand Up @@ -294,23 +286,16 @@ impl Connection {
mirrors.push(Arc::new(Mirror {
config: mirror_config.clone(),
status: AtomicBool::from(true),
// Maybe read from configuration file
failure_limit: 5,
failed_times: AtomicU8::from(0),
failure_limit: mirror_config.failure_limit,
}));
}
}

let current = if let Some(first_mirror) = mirrors.first() {
ArcSwapOption::from(Some(first_mirror.clone()))
} else {
ArcSwapOption::from(None)
};

let connection = Arc::new(Connection {
client,
proxy,
mirror_state: MirrorState { mirrors, current },
mirrors,
shutdown: AtomicBool::new(false),
});

Expand Down Expand Up @@ -363,7 +348,50 @@ impl Connection {
});
}
}
// TODO: check mirrors' health
// Check mirrors' health
for mirror in connection.mirrors.iter() {
let conn = connection.clone();
let connect_timeout = config.connect_timeout;
// Spawn thread to update the health status of mirror server
let mirror_cloned = mirror.clone();
thread::spawn(move || {
let mirror_health_url = format!("{}/v2", mirror_cloned.config.host);

loop {
// Try to recover the mirror server when it is unavailable.
if !mirror_cloned.status.load(Ordering::Relaxed) {
info!(
"Mirror {} unhealthy, try to recover",
mirror_cloned.config.host
);
let client = Client::new();
let _ = client
.get(mirror_health_url.as_str())
.timeout(Duration::from_secs(connect_timeout as u64))
.send()
.map(|resp| {
// If the response status is less than StatusCode::INTERNAL_SERVER_ERROR,
// the mirror server is recovered.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
info!("Mirror {} recovered", mirror_cloned.config.host);
mirror_cloned.failed_times.store(0, Ordering::Relaxed);
mirror_cloned.status.store(true, Ordering::Relaxed);
}
});
}

if conn.shutdown.load(Ordering::Acquire) {
break;
}
thread::sleep(Duration::from_secs(
mirror_cloned.config.health_check_interval,
));
if conn.shutdown.load(Ordering::Acquire) {
break;
}
}
});
}

Ok(connection)
}
Expand Down Expand Up @@ -449,92 +477,83 @@ impl Connection {
}
}

let current_mirror = self.mirror_state.current.load();

if let Some(mirror) = current_mirror.as_ref() {
// With configuration `auth_through` disabled, we should not intend to send authentication
// request to mirror. Mainly because mirrors like P2P/Dragonfly has a poor performance when
// relaying non-data requests. But it's still possible that ever returned token is expired.
// So mirror might still respond us with status code UNAUTHORIZED, which should be handle
// by sending authentication request to the original registry.
//
// - For non-authentication request with token in request header, handle is as usual requests to registry.
// This request should already take token in header.
// - For authentication request
// 1. auth_through is disabled(false): directly pass below mirror translations and jump to original registry handler.
// 2. auth_through is enabled(true): try to get authenticated from mirror and should also handle status code UNAUTHORIZED.
if mirror.config.auth_through
|| !requesting_auth && headers.contains_key(HEADER_AUTHORIZATION)
{
let data_cloned = data.as_ref().cloned();

for (key, value) in mirror.config.headers.iter() {
headers.insert(
HeaderName::from_str(key).unwrap(),
HeaderValue::from_str(value).unwrap(),
);
if !self.mirrors.is_empty() {
let mut requested_all_mirrors = true;
for mirror in self.mirrors.iter() {
// With configuration `auth_through` disabled, we should not intend to send authentication
// request to mirror. Mainly because mirrors like P2P/Dragonfly has a poor performance when
// relaying non-data requests. But it's still possible that ever returned token is expired.
// So mirror might still respond us with status code UNAUTHORIZED, which should be handle
// by sending authentication request to the original registry.
//
// - For non-authentication request with token in request header, handle is as usual requests to registry.
// This request should already take token in header.
// - For authentication request
// 1. auth_through is disabled(false): directly pass below mirror translations and jump to original registry handler.
// 2. auth_through is enabled(true): try to get authenticated from mirror and should also handle status code UNAUTHORIZED.
if !mirror.config.auth_through
&& (!headers.contains_key(HEADER_AUTHORIZATION) || requesting_auth)
{
requested_all_mirrors = false;
break;
}

let current_url = mirror.mirror_url(url)?;
debug!("mirror server url {}", current_url);
if mirror.status.load(Ordering::Relaxed) {
let data_cloned = data.as_ref().cloned();

let result = self.call_inner(
&self.client,
method.clone(),
current_url.as_str(),
&query,
data_cloned,
headers,
catch_status,
false,
);

match result {
Ok(resp) => {
// If the response status >= INTERNAL_SERVER_ERROR, fall back to original registry.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
return Ok(resp);
}
}
Err(err) => {
warn!(
"request mirror server failed, mirror: {:?}, error: {:?}",
mirror, err
for (key, value) in mirror.config.headers.iter() {
headers.insert(
HeaderName::from_str(key).unwrap(),
HeaderValue::from_str(value).unwrap(),
);
mirror.failed_times.fetch_add(1, Ordering::Relaxed);
}

let current_url = mirror.mirror_url(url)?;
debug!("mirror server url {}", current_url);

let result = self.call_inner(
&self.client,
method.clone(),
current_url.as_str(),
&query,
data_cloned,
headers,
catch_status,
false,
);

if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit {
match result {
Ok(resp) => {
// If the response status >= INTERNAL_SERVER_ERROR, move to the next mirror server.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
return Ok(resp);
}
}
Err(err) => {
warn!(
"reach to failure limit {}, disable mirror: {:?}",
mirror.failure_limit, mirror
"request mirror server failed, mirror: {:?}, error: {:?}",
mirror, err
);
mirror.status.store(false, Ordering::Relaxed);

let mut idx = 0;
loop {
if idx == self.mirror_state.mirrors.len() {
break None;
}
let m = &self.mirror_state.mirrors[idx];
if m.status.load(Ordering::Relaxed) {
warn!("mirror server has been changed to {:?}", m);
break Some(m);
}

idx += 1;
mirror.failed_times.fetch_add(1, Ordering::Relaxed);

if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit {
warn!(
"reach to failure limit {}, disable mirror: {:?}",
mirror.failure_limit, mirror
);
mirror.status.store(false, Ordering::Relaxed);
}
.map(|m| self.mirror_state.current.store(Some(m.clone())))
.unwrap_or_else(|| self.mirror_state.current.store(None));
}
}
}
// FIXME: Move to the next available mirror directly when mirror health checking is available.
warn!("Failed to request mirror server, fallback to original server.");
// Remove mirror-related headers to avoid sending them to the original registry.
// Remove mirror-related headers to avoid sending them to the next mirror server and original registry.
for (key, _) in mirror.config.headers.iter() {
headers.remove(HeaderName::from_str(key).unwrap());
}
}
if requested_all_mirrors {
warn!("Request to all mirror server failed, fallback to original server.");
}
}

self.call_inner(
Expand Down
Loading

0 comments on commit 8593a35

Please sign in to comment.