Skip to content

Commit

Permalink
Fix retry logic (#2573) (#2572) (#2574)
Browse files Browse the repository at this point in the history
* Fix retry logic (#2573) (#2572)

* Fix logical conflicts

* Rework tests
  • Loading branch information
tustvold authored Aug 25, 2022
1 parent 9822d62 commit 8eea918
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 133 deletions.
1 change: 1 addition & 0 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ dotenv = "0.15.0"
tempfile = "3.1.0"
futures-test = "0.3"
rand = "0.8"
hyper = { version = "0.14", features = ["server"] }
50 changes: 20 additions & 30 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,36 +52,48 @@ const STRICT_PATH_ENCODE_SET: AsciiSet = STRICT_ENCODE_SET.remove(b'/');
pub(crate) enum Error {
#[snafu(display("Error performing get request {}: {}", path, source))]
GetRequest {
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error fetching get response body {}: {}", path, source))]
GetResponseBody {
source: reqwest::Error,
path: String,
},

#[snafu(display("Error performing put request {}: {}", path, source))]
PutRequest {
source: reqwest::Error,
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error performing delete request {}: {}", path, source))]
DeleteRequest {
source: reqwest::Error,
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error performing copy request {}: {}", path, source))]
CopyRequest {
source: reqwest::Error,
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error performing list request: {}", source))]
ListRequest { source: reqwest::Error },
ListRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting list response body: {}", source))]
ListResponseBody { source: reqwest::Error },

#[snafu(display("Error performing create multipart request: {}", source))]
CreateMultipartRequest { source: reqwest::Error },
CreateMultipartRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting create multipart response body: {}", source))]
CreateMultipartResponseBody { source: reqwest::Error },

#[snafu(display("Error performing complete multipart request: {}", source))]
CompleteMultipartRequest { source: reqwest::Error },
CompleteMultipartRequest { source: crate::client::retry::Error },

#[snafu(display("Got invalid list response: {}", source))]
InvalidListResponse { source: quick_xml::de::DeError },
Expand Down Expand Up @@ -259,10 +271,6 @@ impl S3Client {
.with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?
.error_for_status()
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
Expand Down Expand Up @@ -290,10 +298,6 @@ impl S3Client {
.with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
.send_retry(&self.config.retry_config)
.await
.context(PutRequestSnafu {
path: path.as_ref(),
})?
.error_for_status()
.context(PutRequestSnafu {
path: path.as_ref(),
})?;
Expand All @@ -316,10 +320,6 @@ impl S3Client {
.with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
.send_retry(&self.config.retry_config)
.await
.context(DeleteRequestSnafu {
path: path.as_ref(),
})?
.error_for_status()
.context(DeleteRequestSnafu {
path: path.as_ref(),
})?;
Expand All @@ -339,10 +339,6 @@ impl S3Client {
.with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
.send_retry(&self.config.retry_config)
.await
.context(CopyRequestSnafu {
path: from.as_ref(),
})?
.error_for_status()
.context(CopyRequestSnafu {
path: from.as_ref(),
})?;
Expand Down Expand Up @@ -385,11 +381,9 @@ impl S3Client {
.send_retry(&self.config.retry_config)
.await
.context(ListRequestSnafu)?
.error_for_status()
.context(ListRequestSnafu)?
.bytes()
.await
.context(ListRequestSnafu)?;
.context(ListResponseBodySnafu)?;

let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
.context(InvalidListResponseSnafu)?;
Expand Down Expand Up @@ -430,11 +424,9 @@ impl S3Client {
.send_retry(&self.config.retry_config)
.await
.context(CreateMultipartRequestSnafu)?
.error_for_status()
.context(CreateMultipartRequestSnafu)?
.bytes()
.await
.context(CreateMultipartRequestSnafu)?;
.context(CreateMultipartResponseBodySnafu)?;

let response: InitiateMultipart = quick_xml::de::from_reader(response.reader())
.context(InvalidMultipartResponseSnafu)?;
Expand Down Expand Up @@ -470,8 +462,6 @@ impl S3Client {
.with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
.send_retry(&self.config.retry_config)
.await
.context(CompleteMultipartRequestSnafu)?
.error_for_status()
.context(CompleteMultipartRequestSnafu)?;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl ObjectStore for AmazonS3 {
.await?
.bytes()
.await
.map_err(|source| client::Error::GetRequest {
.map_err(|source| client::Error::GetResponseBody {
source,
path: location.to_string(),
})?;
Expand Down
37 changes: 14 additions & 23 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,30 +41,39 @@ use url::Url;
pub(crate) enum Error {
#[snafu(display("Error performing get request {}: {}", path, source))]
GetRequest {
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error getting get response body {}: {}", path, source))]
GetResponseBody {
source: reqwest::Error,
path: String,
},

#[snafu(display("Error performing put request {}: {}", path, source))]
PutRequest {
source: reqwest::Error,
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error performing delete request {}: {}", path, source))]
DeleteRequest {
source: reqwest::Error,
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error performing copy request {}: {}", path, source))]
CopyRequest {
source: reqwest::Error,
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error performing list request: {}", source))]
ListRequest { source: reqwest::Error },
ListRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting list response body: {}", source))]
ListResponseBody { source: reqwest::Error },

#[snafu(display("Error performing create multipart request: {}", source))]
CreateMultipartRequest { source: reqwest::Error },
Expand Down Expand Up @@ -218,10 +227,6 @@ impl AzureClient {
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
.context(PutRequestSnafu {
path: path.as_ref(),
})?
.error_for_status()
.context(PutRequestSnafu {
path: path.as_ref(),
})?;
Expand Down Expand Up @@ -259,10 +264,6 @@ impl AzureClient {
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?
.error_for_status()
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
Expand All @@ -286,10 +287,6 @@ impl AzureClient {
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
.context(DeleteRequestSnafu {
path: path.as_ref(),
})?
.error_for_status()
.context(DeleteRequestSnafu {
path: path.as_ref(),
})?;
Expand Down Expand Up @@ -328,10 +325,6 @@ impl AzureClient {
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
.context(CopyRequestSnafu {
path: from.as_ref(),
})?
.error_for_status()
.context(CopyRequestSnafu {
path: from.as_ref(),
})?;
Expand Down Expand Up @@ -373,11 +366,9 @@ impl AzureClient {
.send_retry(&self.config.retry_config)
.await
.context(ListRequestSnafu)?
.error_for_status()
.context(ListRequestSnafu)?
.bytes()
.await
.context(ListRequestSnafu)?;
.context(ListResponseBodySnafu)?;

let mut response: ListResultInternal =
quick_xml::de::from_reader(response.reader())
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl ObjectStore for MicrosoftAzure {
.await?
.bytes()
.await
.map_err(|source| client::Error::GetRequest {
.map_err(|source| client::Error::GetResponseBody {
source,
path: location.to_string(),
})?;
Expand Down
13 changes: 7 additions & 6 deletions object_store/src/client/oauth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ pub enum Error {
UnsupportedKey { encoding: String },

#[snafu(display("Error performing token request: {}", source))]
TokenRequest { source: reqwest::Error },
TokenRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting token response body: {}", source))]
TokenResponseBody { source: reqwest::Error },
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -181,11 +184,9 @@ impl OAuthProvider {
.send_retry(retry)
.await
.context(TokenRequestSnafu)?
.error_for_status()
.context(TokenRequestSnafu)?
.json()
.await
.context(TokenRequestSnafu)?;
.context(TokenResponseBodySnafu)?;

let token = TemporaryToken {
token: response.access_token,
Expand Down Expand Up @@ -289,10 +290,10 @@ impl ClientSecretOAuthProvider {
.await
.context(TokenRequestSnafu)?
.error_for_status()
.context(TokenRequestSnafu)?
.context(TokenResponseBodySnafu)?
.json()
.await
.context(TokenRequestSnafu)?;
.context(TokenResponseBodySnafu)?;

let token = TemporaryToken {
token: response.access_token,
Expand Down
Loading

0 comments on commit 8eea918

Please sign in to comment.