From c1fedbedaeb906fcca4d13b03094095cbb42daef Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 2 Sep 2024 16:50:30 +0800 Subject: [PATCH] chore: Use backon to replace backoff Signed-off-by: Xuanwo --- crates/aws/Cargo.toml | 2 +- crates/aws/src/lib.rs | 294 ++++++++++++++++++++++-------------------- 2 files changed, 156 insertions(+), 140 deletions(-) diff --git a/crates/aws/Cargo.toml b/crates/aws/Cargo.toml index c47065dce4..e79d92a3d2 100644 --- a/crates/aws/Cargo.toml +++ b/crates/aws/Cargo.toml @@ -33,7 +33,7 @@ tokio = { workspace = true } regex = { workspace = true } uuid = { workspace = true, features = ["serde", "v4"] } url = { workspace = true } -backoff = { version = "0.4", features = [ "tokio" ] } +backon = { version = "1",default-features = false, features = [ "tokio-sleep" ] } hyper-tls = { version = "0.5", optional = true } [dev-dependencies] diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index 720a1e6a07..187462cb12 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -7,6 +7,7 @@ pub mod logstore; mod native; pub mod storage; use aws_config::SdkConfig; +use aws_sdk_dynamodb::error::SdkError; use aws_sdk_dynamodb::{ operation::{ create_table::CreateTableError, delete_item::DeleteItemError, get_item::GetItemError, @@ -283,28 +284,28 @@ impl DynamoDbLockClient { version: i64, ) -> Result, LockClientError> { let item = self - .retry(|| async { - match self - .dynamodb_client - .get_item() - .consistent_read(true) - .table_name(&self.config.lock_table_name) - .set_key(Some(self.get_primary_key(version, table_path))) - .send() - .await - { - Ok(x) => Ok(x), - Err(sdk_err) => match sdk_err.as_service_error() { - Some(GetItemError::ProvisionedThroughputExceededException(_)) => { - Err(backoff::Error::transient( - LockClientError::ProvisionedThroughputExceeded, - )) - } - _ => Err(backoff::Error::permanent(sdk_err.into())), - }, + .retry( + || async { + self.dynamodb_client + .get_item() + .consistent_read(true) + .table_name(&self.config.lock_table_name) + .set_key(Some(self.get_primary_key(version, table_path))) + .send() + .await + }, + |err| match err.as_service_error() { + Some(GetItemError::ProvisionedThroughputExceededException(_)) => true, + _ => false, + }, + ) + .await + .map_err(|err| match err.as_service_error() { + Some(GetItemError::ProvisionedThroughputExceededException(_)) => { + LockClientError::ProvisionedThroughputExceeded } - }) - .await?; + _ => err.into(), + })?; item.item.as_ref().map(CommitEntry::try_from).transpose() } @@ -314,36 +315,38 @@ impl DynamoDbLockClient { table_path: &str, entry: &CommitEntry, ) -> Result<(), LockClientError> { - self.retry(|| async { - let item = create_value_map(entry, table_path); - match self - .dynamodb_client - .put_item() - .condition_expression(constants::CONDITION_EXPR_CREATE.as_str()) - .table_name(self.get_lock_table_name()) - .set_item(Some(item)) - .send() - .await - { - Ok(_) => Ok(()), - Err(err) => match err.as_service_error() { - Some(PutItemError::ProvisionedThroughputExceededException(_)) => Err( - backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded), - ), - Some(PutItemError::ConditionalCheckFailedException(_)) => Err( - backoff::Error::permanent(LockClientError::VersionAlreadyExists { - table_path: table_path.to_owned(), - version: entry.version, - }), - ), - Some(PutItemError::ResourceNotFoundException(_)) => Err( - backoff::Error::permanent(LockClientError::LockTableNotFound), - ), - _ => Err(backoff::Error::permanent(err.into())), - }, + self.retry( + || async { + let item = create_value_map(entry, table_path); + let _ = self + .dynamodb_client + .put_item() + .condition_expression(constants::CONDITION_EXPR_CREATE.as_str()) + .table_name(self.get_lock_table_name()) + .set_item(Some(item)) + .send() + .await?; + Ok(()) + }, + |err: &SdkError<_, _>| match err.as_service_error() { + Some(PutItemError::ProvisionedThroughputExceededException(_)) => true, + _ => false, + }, + ) + .await + .map_err(|err| match err.as_service_error() { + Some(PutItemError::ProvisionedThroughputExceededException(_)) => { + LockClientError::ProvisionedThroughputExceeded } + Some(PutItemError::ConditionalCheckFailedException(_)) => { + LockClientError::VersionAlreadyExists { + table_path: table_path.to_owned(), + version: entry.version, + } + } + Some(PutItemError::ResourceNotFoundException(_)) => LockClientError::LockTableNotFound, + _ => err.into(), }) - .await } /// Get the latest entry (entry with highest version). @@ -365,33 +368,33 @@ impl DynamoDbLockClient { limit: i64, ) -> Result, LockClientError> { let query_result = self - .retry(|| async { - match self - .dynamodb_client - .query() - .table_name(self.get_lock_table_name()) - .consistent_read(true) - .limit(limit.try_into().unwrap_or(i32::MAX)) - .scan_index_forward(false) - .key_condition_expression(format!("{} = :tn", constants::ATTR_TABLE_PATH)) - .set_expression_attribute_values(Some( - maplit::hashmap!(":tn".into() => string_attr(table_path)), - )) - .send() - .await - { - Ok(result) => Ok(result), - Err(sdk_err) => match sdk_err.as_service_error() { - Some(QueryError::ProvisionedThroughputExceededException(_)) => { - Err(backoff::Error::transient( - LockClientError::ProvisionedThroughputExceeded, - )) - } - _ => Err(backoff::Error::permanent(sdk_err.into())), - }, + .retry( + || async { + self.dynamodb_client + .query() + .table_name(self.get_lock_table_name()) + .consistent_read(true) + .limit(limit.try_into().unwrap_or(i32::MAX)) + .scan_index_forward(false) + .key_condition_expression(format!("{} = :tn", constants::ATTR_TABLE_PATH)) + .set_expression_attribute_values(Some( + maplit::hashmap!(":tn".into() => string_attr(table_path)), + )) + .send() + .await + }, + |err: &SdkError<_, _>| match err.as_service_error() { + Some(QueryError::ProvisionedThroughputExceededException(_)) => true, + _ => false, + }, + ) + .await + .map_err(|err| match err.as_service_error() { + Some(QueryError::ProvisionedThroughputExceededException(_)) => { + LockClientError::ProvisionedThroughputExceeded } - }) - .await?; + _ => err.into(), + })?; query_result .items @@ -412,35 +415,44 @@ impl DynamoDbLockClient { .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs(); - self.retry(|| async { - match self - .dynamodb_client - .update_item() - .table_name(self.get_lock_table_name()) - .set_key(Some(self.get_primary_key(version, table_path))) - .update_expression("SET complete = :c, expireTime = :e".to_owned()) - .set_expression_attribute_values(Some(maplit::hashmap! { - ":c".to_owned() => string_attr("true"), - ":e".to_owned() => num_attr(seconds_since_epoch), - ":f".into() => string_attr("false"), - })) - .condition_expression(constants::CONDITION_UPDATE_INCOMPLETE) - .send() - .await - { - Ok(_) => Ok(UpdateLogEntryResult::UpdatePerformed), - Err(err) => match err.as_service_error() { - Some(UpdateItemError::ProvisionedThroughputExceededException(_)) => Err( - backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded), - ), - Some(UpdateItemError::ConditionalCheckFailedException(_)) => { - Ok(UpdateLogEntryResult::AlreadyCompleted) - } - _ => Err(backoff::Error::permanent(err.into())), + let res = self + .retry( + || async { + let _ = self + .dynamodb_client + .update_item() + .table_name(self.get_lock_table_name()) + .set_key(Some(self.get_primary_key(version, table_path))) + .update_expression("SET complete = :c, expireTime = :e".to_owned()) + .set_expression_attribute_values(Some(maplit::hashmap! { + ":c".to_owned() => string_attr("true"), + ":e".to_owned() => num_attr(seconds_since_epoch), + ":f".into() => string_attr("false"), + })) + .condition_expression(constants::CONDITION_UPDATE_INCOMPLETE) + .send() + .await?; + Ok(()) }, - } - }) - .await + |err: &SdkError<_, _>| match err.as_service_error() { + Some(UpdateItemError::ProvisionedThroughputExceededException(_)) => true, + _ => false, + }, + ) + .await; + + match res { + Ok(()) => Ok(UpdateLogEntryResult::UpdatePerformed), + Err(err) => match err.as_service_error() { + Some(UpdateItemError::ProvisionedThroughputExceededException(_)) => { + Err(LockClientError::ProvisionedThroughputExceeded) + } + Some(UpdateItemError::ConditionalCheckFailedException(_)) => { + Ok(UpdateLogEntryResult::AlreadyCompleted) + } + _ => Err(err.into()), + }, + } } /// Delete existing log entry if it is not already complete @@ -449,48 +461,52 @@ impl DynamoDbLockClient { version: i64, table_path: &str, ) -> Result<(), LockClientError> { - self.retry(|| async { - match self - .dynamodb_client - .delete_item() - .table_name(self.get_lock_table_name()) - .set_key(Some(self.get_primary_key(version, table_path))) - .set_expression_attribute_values(Some(maplit::hashmap! { - ":f".into() => string_attr("false"), - })) - .condition_expression(constants::CONDITION_DELETE_INCOMPLETE.as_str()) - .send() - .await - { - Ok(_) => Ok(()), - Err(err) => match err.as_service_error() { - Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => Err( - backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded), - ), - Some(DeleteItemError::ConditionalCheckFailedException(_)) => Err( - backoff::Error::permanent(LockClientError::VersionAlreadyCompleted { - table_path: table_path.to_owned(), - version, - }), - ), - _ => Err(backoff::Error::permanent(err.into())), - }, + self.retry( + || async { + let _ = self + .dynamodb_client + .delete_item() + .table_name(self.get_lock_table_name()) + .set_key(Some(self.get_primary_key(version, table_path))) + .set_expression_attribute_values(Some(maplit::hashmap! { + ":f".into() => string_attr("false"), + })) + .condition_expression(constants::CONDITION_DELETE_INCOMPLETE.as_str()) + .send() + .await?; + Ok(()) + }, + |err: &SdkError<_, _>| match err.as_service_error() { + Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => true, + _ => false, + }, + ) + .await + .map_err(|err| match err.as_service_error() { + Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => { + LockClientError::ProvisionedThroughputExceeded + } + Some(DeleteItemError::ConditionalCheckFailedException(_)) => { + LockClientError::VersionAlreadyCompleted { + table_path: table_path.to_owned(), + version, + } } + _ => err.into(), }) - .await } - async fn retry(&self, operation: Fn) -> Result + async fn retry(&self, operation: F, when: Wn) -> Result where - Fn: FnMut() -> Fut, - Fut: std::future::Future>>, + F: FnMut() -> Fut, + Fut: std::future::Future>, + Wn: Fn(&E) -> bool, { - let backoff = backoff::ExponentialBackoffBuilder::new() - .with_multiplier(2.) - .with_max_interval(Duration::from_secs(15)) - .with_max_elapsed_time(Some(self.config.max_elapsed_request_time)) - .build(); - backoff::future::retry(backoff, operation).await + use backon::Retryable; + let backoff = backon::ExponentialBuilder::default() + .with_factor(2.) + .with_max_delay(self.config.max_elapsed_request_time); + operation.retry(backoff).when(when).await } }