Skip to content

Commit

Permalink
storage: refresh token to avoid forwarding to P2P/dragonfly
Browse files Browse the repository at this point in the history
Forward 401 response to P2P/dragonfly will affect performance
when the authorization token is expired.
When there is a mirror that auth_through false, we refresh the token regularly
to avoid forwarding the 401 response to mirror.

Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
  • Loading branch information
sctb512 committed Oct 29, 2022
1 parent d1b21ab commit b7cedcd
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 6 deletions.
6 changes: 3 additions & 3 deletions storage/src/backend/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,14 @@ pub(crate) fn respond(resp: Response, catch_status: bool) -> ConnectionResult<Re
pub(crate) struct Connection {
client: Client,
proxy: Option<Proxy>,
mirrors: Vec<Arc<Mirror>>,
shutdown: AtomicBool,
pub mirrors: Vec<Arc<Mirror>>,
pub shutdown: AtomicBool,
}

#[derive(Debug)]
pub(crate) struct Mirror {
/// Information for mirror from configuration file.
config: MirrorConfig,
pub config: MirrorConfig,
/// Mirror status, it will be set to false by atomic operation when mirror is not work.
status: AtomicBool,
/// Failed times requesting mirror, the status will be marked as false when failed_times = failure_limit.
Expand Down
116 changes: 113 additions & 3 deletions storage/src/backend/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
//! Storage backend driver to access blobs on container image registry.
use std::collections::HashMap;
use std::io::{Error, Read, Result};
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use arc_swap::ArcSwapOption;
use reqwest::blocking::Response;
pub use reqwest::header::HeaderMap;
use reqwest::header::{HeaderValue, CONTENT_LENGTH};
Expand Down Expand Up @@ -102,6 +106,12 @@ impl HashCache {
#[derive(Clone, serde::Deserialize)]
struct TokenResponse {
token: String,
#[serde(default = "default_expires_in")]
expires_in: u64,
}

fn default_expires_in() -> u64 {
10 * 60
}

#[derive(Debug)]
Expand All @@ -110,7 +120,7 @@ struct BasicAuth {
realm: String,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct BearerAuth {
realm: String,
service: String,
Expand Down Expand Up @@ -149,6 +159,11 @@ struct RegistryState {
// Cache 30X redirect url
// Example: RwLock<HashMap<"<blob_id>", "<redirected_url>">>
cached_redirect: HashCache,

// The expiration time of the token, which is obtained from the registry server.
refresh_token_time: ArcSwapOption<u64>,
// Cache bearer auth for refreshing token.
cached_bearer_auth: ArcSwapOption<BearerAuth>,
}

impl RegistryState {
Expand Down Expand Up @@ -207,6 +222,19 @@ impl RegistryState {
e
))
})?;
if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) {
self.refresh_token_time
.store(Some(Arc::new(now_timestamp.as_secs() + ret.expires_in)));
warn!(
"cached_bearer_auth: {:?}, next time: {}",
auth,
now_timestamp.as_secs() + ret.expires_in
);
}

// Cache bearer auth for refreshing token.
self.cached_bearer_auth.store(Some(Arc::new(auth)));

Ok(ret.token)
}

Expand Down Expand Up @@ -605,13 +633,27 @@ impl Registry {
blob_url_scheme: config.blob_url_scheme,
blob_redirected_host: config.blob_redirected_host,
cached_redirect: HashCache::new(),
refresh_token_time: ArcSwapOption::new(None),
cached_bearer_auth: ArcSwapOption::new(None),
});

Ok(Registry {
let mirrors = connection.mirrors.clone();

let registry = Registry {
connection,
state,
metrics: BackendMetrics::new(id, "registry"),
})
};

for mirror in mirrors.iter() {
if !mirror.config.auth_through {
info!("Refresh token thread started.");
registry.run_refresh_token_thread();
break;
}
}

Ok(registry)
}

fn get_authorization_info(auth: &Option<String>) -> Result<(String, String)> {
Expand All @@ -638,6 +680,72 @@ impl Registry {
Ok((String::new(), String::new()))
}
}

fn run_refresh_token_thread(&self) {
let conn = self.connection.clone();
let state = self.state.clone();
// The default refresh token internal is 10 minutes.
let refresh_check_internal = 10 * 60;
thread::spawn(move || {
loop {
if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) {
if let Some(next_refresh_timestamp) = state.refresh_token_time.load().as_deref()
{
// If the token will expire in next refresh check internal, get new token now.
// Add 20 seconds to handle critical cases.
if now_timestamp.as_secs() + refresh_check_internal + 20
>= *next_refresh_timestamp
{
if let Some(cached_bearer_auth) =
state.cached_bearer_auth.load().as_deref()
{
let mut cached_bearer_auth_clone = cached_bearer_auth.clone();
if let Ok(url) = Url::parse(&cached_bearer_auth_clone.realm) {
let last_cached_auth = state.cached_auth.get();

let query: Vec<(_, _)> =
url.query_pairs().filter(|p| p.0 != "grant_type").collect();

let mut refresh_url = url.clone();
refresh_url.set_query(None);

for pair in query {
refresh_url.query_pairs_mut().append_pair(
&pair.0.to_string()[..],
&pair.1.to_string()[..],
);
}
refresh_url
.query_pairs_mut()
.append_pair("grant_type", "refresh_token");
cached_bearer_auth_clone.realm = refresh_url.to_string();

let token = state.get_token(cached_bearer_auth_clone, &conn);

if let Ok(token) = token {
let new_cached_auth = format!("Bearer {}", token);
info!(
"Authorization token for registry has been refreshed."
);
// Refresh authorization token
state.cached_auth.set(&last_cached_auth, new_cached_auth);
}
}
}
}
}
}

if conn.shutdown.load(Ordering::Acquire) {
break;
}
thread::sleep(Duration::from_secs(refresh_check_internal));
if conn.shutdown.load(Ordering::Acquire) {
break;
}
}
});
}
}

impl BlobBackend for Registry {
Expand Down Expand Up @@ -723,6 +831,8 @@ mod tests {
blob_redirected_host: "oss.alibaba-inc.com".to_string(),
cached_auth: Default::default(),
cached_redirect: Default::default(),
refresh_token_time: ArcSwapOption::new(None),
cached_bearer_auth: ArcSwapOption::new(None),
};

assert_eq!(
Expand Down

0 comments on commit b7cedcd

Please sign in to comment.