Skip to content

Commit

Permalink
Retry GCP requests on server error (#2243)
Browse files Browse the repository at this point in the history
* Retry GCP requests on server error

* Also retry OAuth

* Lower default backoff configuration

* Add retry disclaimer

* Add retry_timeout

* Add logging

* Fix features
  • Loading branch information
tustvold authored Aug 3, 2022
1 parent b826162 commit 299908e
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 24 deletions.
5 changes: 3 additions & 2 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ quick-xml = { version = "0.23.0", features = ["serialize"], optional = true }
rustls-pemfile = { version = "1.0", default-features = false, optional = true }
ring = { version = "0.16", default-features = false, features = ["std"] }
base64 = { version = "0.13", default-features = false, optional = true }
rand = { version = "0.8", default-features = false, optional = true, features = ["std", "std_rng"] }
# for rusoto
hyper = { version = "0.14", optional = true, default-features = false }
# for rusoto
Expand All @@ -58,7 +59,7 @@ percent-encoding = "2.1"
rusoto_core = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
rusoto_credential = { version = "0.48.0", optional = true, default-features = false }
rusoto_s3 = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
rusoto_sts = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
rusoto_sts = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
snafu = "0.7"
tokio = { version = "1.18", features = ["sync", "macros", "parking_lot", "rt-multi-thread", "time", "io-util"] }
tracing = { version = "0.1" }
Expand All @@ -71,7 +72,7 @@ walkdir = "2"
[features]
azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest"]
azure_test = ["azure", "azure_core/azurite_workaround", "azure_storage/azurite_workaround", "azure_storage_blobs/azurite_workaround"]
gcp = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64"]
gcp = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand"]
aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts", "hyper", "hyper-rustls"]

[dev-dependencies] # In alphabetical order
Expand Down
156 changes: 156 additions & 0 deletions object_store/src/client/backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use rand::prelude::*;
use std::time::Duration;

/// Exponential backoff with jitter
///
/// See <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>
#[allow(missing_copy_implementations)]
#[derive(Debug, Clone)]
pub struct BackoffConfig {
/// The initial backoff duration
pub init_backoff: Duration,
/// The maximum backoff duration
pub max_backoff: Duration,
/// The base of the exponential to use
pub base: f64,
}

impl Default for BackoffConfig {
fn default() -> Self {
Self {
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(15),
base: 2.,
}
}
}

/// [`Backoff`] can be created from a [`BackoffConfig`]
///
/// Consecutive calls to [`Backoff::next`] will return the next backoff interval
///
pub struct Backoff {
init_backoff: f64,
next_backoff_secs: f64,
max_backoff_secs: f64,
base: f64,
rng: Option<Box<dyn RngCore + Sync + Send>>,
}

impl std::fmt::Debug for Backoff {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Backoff")
.field("init_backoff", &self.init_backoff)
.field("next_backoff_secs", &self.next_backoff_secs)
.field("max_backoff_secs", &self.max_backoff_secs)
.field("base", &self.base)
.finish()
}
}

impl Backoff {
/// Create a new [`Backoff`] from the provided [`BackoffConfig`]
pub fn new(config: &BackoffConfig) -> Self {
Self::new_with_rng(config, None)
}

/// Creates a new `Backoff` with the optional `rng`
///
/// Used [`rand::thread_rng()`] if no rng provided
pub fn new_with_rng(
config: &BackoffConfig,
rng: Option<Box<dyn RngCore + Sync + Send>>,
) -> Self {
let init_backoff = config.init_backoff.as_secs_f64();
Self {
init_backoff,
next_backoff_secs: init_backoff,
max_backoff_secs: config.max_backoff.as_secs_f64(),
base: config.base,
rng,
}
}

/// Returns the next backoff duration to wait for
pub fn next(&mut self) -> Duration {
let range = self.init_backoff..(self.next_backoff_secs * self.base);

let rand_backoff = match self.rng.as_mut() {
Some(rng) => rng.gen_range(range),
None => thread_rng().gen_range(range),
};

let next_backoff = self.max_backoff_secs.min(rand_backoff);
Duration::from_secs_f64(std::mem::replace(
&mut self.next_backoff_secs,
next_backoff,
))
}
}

#[cfg(test)]
mod tests {
use super::*;
use rand::rngs::mock::StepRng;

#[test]
fn test_backoff() {
let init_backoff_secs = 1.;
let max_backoff_secs = 500.;
let base = 3.;

let config = BackoffConfig {
init_backoff: Duration::from_secs_f64(init_backoff_secs),
max_backoff: Duration::from_secs_f64(max_backoff_secs),
base,
};

let assert_fuzzy_eq =
|a: f64, b: f64| assert!((b - a).abs() < 0.0001, "{} != {}", a, b);

// Create a static rng that takes the minimum of the range
let rng = Box::new(StepRng::new(0, 0));
let mut backoff = Backoff::new_with_rng(&config, Some(rng));

for _ in 0..20 {
assert_eq!(backoff.next().as_secs_f64(), init_backoff_secs);
}

// Create a static rng that takes the maximum of the range
let rng = Box::new(StepRng::new(u64::MAX, 0));
let mut backoff = Backoff::new_with_rng(&config, Some(rng));

for i in 0..20 {
let value = (base.powi(i) * init_backoff_secs).min(max_backoff_secs);
assert_fuzzy_eq(backoff.next().as_secs_f64(), value);
}

// Create a static rng that takes the mid point of the range
let rng = Box::new(StepRng::new(u64::MAX / 2, 0));
let mut backoff = Backoff::new_with_rng(&config, Some(rng));

let mut value = init_backoff_secs;
for _ in 0..20 {
assert_fuzzy_eq(backoff.next().as_secs_f64(), value);
value = (init_backoff_secs + (value * base - init_backoff_secs) / 2.)
.min(max_backoff_secs);
}
}
}
23 changes: 23 additions & 0 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Generic utilities reqwest based ObjectStore implementations
pub mod backoff;
pub mod oauth;
pub mod retry;
pub mod token;
12 changes: 9 additions & 3 deletions object_store/src/oauth.rs → object_store/src/client/oauth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use crate::token::TemporaryToken;
use crate::client::retry::RetryExt;
use crate::client::token::TemporaryToken;
use crate::RetryConfig;
use reqwest::{Client, Method};
use ring::signature::RsaKeyPair;
use snafu::{ResultExt, Snafu};
Expand Down Expand Up @@ -133,7 +135,11 @@ impl OAuthProvider {
}

/// Fetch a fresh token
pub async fn fetch_token(&self, client: &Client) -> Result<TemporaryToken<String>> {
pub async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
) -> Result<TemporaryToken<String>> {
let now = seconds_since_epoch();
let exp = now + 3600;

Expand Down Expand Up @@ -168,7 +174,7 @@ impl OAuthProvider {
let response: TokenResponse = client
.request(Method::POST, &self.audience)
.form(&body)
.send()
.send_retry(retry)
.await
.context(TokenRequestSnafu)?
.error_for_status()
Expand Down
106 changes: 106 additions & 0 deletions object_store/src/client/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! A shared HTTP client implementation incorporating retries
use crate::client::backoff::{Backoff, BackoffConfig};
use futures::future::BoxFuture;
use futures::FutureExt;
use reqwest::{Response, Result};
use std::time::{Duration, Instant};
use tracing::info;

/// Contains the configuration for how to respond to server errors
///
/// By default they will be retried up to some limit, using exponential
/// backoff with jitter. See [`BackoffConfig`] for more information
///
#[derive(Debug, Clone)]
pub struct RetryConfig {
/// The backoff configuration
pub backoff: BackoffConfig,

/// The maximum number of times to retry a request
///
/// Set to 0 to disable retries
pub max_retries: usize,

/// The maximum length of time from the initial request
/// after which no further retries will be attempted
///
/// This not only bounds the length of time before a server
/// error will be surfaced to the application, but also bounds
/// the length of time a request's credentials must remain valid.
///
/// As requests are retried without renewing credentials or
/// regenerating request payloads, this number should be kept
/// below 5 minutes to avoid errors due to expired credentials
/// and/or request payloads
pub retry_timeout: Duration,
}

impl Default for RetryConfig {
fn default() -> Self {
Self {
backoff: Default::default(),
max_retries: 10,
retry_timeout: Duration::from_secs(3 * 60),
}
}
}

pub trait RetryExt {
/// Dispatch a request with the given retry configuration
///
/// # Panic
///
/// This will panic if the request body is a stream
fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result<Response>>;
}

impl RetryExt for reqwest::RequestBuilder {
fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result<Response>> {
let mut backoff = Backoff::new(&config.backoff);
let max_retries = config.max_retries;
let retry_timeout = config.retry_timeout;

async move {
let mut retries = 0;
let now = Instant::now();

loop {
let s = self.try_clone().expect("request body must be cloneable");
match s.send().await {
Err(e)
if retries < max_retries
&& now.elapsed() < retry_timeout
&& e.status()
.map(|s| s.is_server_error())
.unwrap_or(false) =>
{
let sleep = backoff.next();
retries += 1;
info!("Encountered server error, backing off for {} seconds, retry {} of {}", sleep.as_secs_f32(), retries, max_retries);
tokio::time::sleep(sleep).await;
}
r => return r,
}
}
}
.boxed()
}
}
File renamed without changes.
Loading

0 comments on commit 299908e

Please sign in to comment.