diff --git a/crates/deltalake-aws/Cargo.toml b/crates/deltalake-aws/Cargo.toml index 97cd60f1e2..07a2c9e8bd 100644 --- a/crates/deltalake-aws/Cargo.toml +++ b/crates/deltalake-aws/Cargo.toml @@ -23,6 +23,7 @@ tokio = { workspace = true } regex = { workspace = true } uuid = { workspace = true, features = ["serde", "v4"] } url = { workspace = true } +backoff = { version = "0.4", features = [ "tokio" ] } [dev-dependencies] chrono = { workspace = true } diff --git a/crates/deltalake-aws/helpers.rs b/crates/deltalake-aws/helpers.rs index 1ba5d1cd15..1e6d88a340 100644 --- a/crates/deltalake-aws/helpers.rs +++ b/crates/deltalake-aws/helpers.rs @@ -39,7 +39,7 @@ pub async fn setup_s3_context() -> TestContext { config.insert("AWS_ACCESS_KEY_ID".to_owned(), "deltalake".to_owned()); config.insert("AWS_SECRET_ACCESS_KEY".to_owned(), "weloverust".to_owned()); config.insert("AWS_S3_LOCKING_PROVIDER".to_owned(), "dynamodb".to_owned()); - config.insert("DYNAMO_LOCK_TABLE_NAME".to_owned(), lock_table.clone()); + config.insert(constants::LOCK_TABLE_KEY_NAME.to_owned(), lock_table.clone()); config.insert("AWS_ALLOW_HTTP".to_owned(), "TRUE".to_string()); TestContext { diff --git a/crates/deltalake-aws/src/errors.rs b/crates/deltalake-aws/src/errors.rs index 7ba4fa5403..bbce9dc426 100644 --- a/crates/deltalake-aws/src/errors.rs +++ b/crates/deltalake-aws/src/errors.rs @@ -1,5 +1,7 @@ //! Errors for S3 log store backed by DynamoDb +use std::num::ParseIntError; + use rusoto_core::RusotoError; use rusoto_dynamodb::{CreateTableError, GetItemError, PutItemError, QueryError, UpdateItemError}; @@ -24,6 +26,13 @@ pub enum DynamoDbConfigError { /// Billing mode string invalid #[error("Invalid billing mode : {0}, supported values : ['provided', 'pay_per_request']")] InvalidBillingMode(String), + + /// Cannot parse max_elapsed_request_time value into u64 + #[error("Cannot parse max elapsed request time into u64: {source}")] + ParseMaxElapsedRequestTime { + // config_value: String, + source: ParseIntError, + }, } /// Errors produced by `DynamoDbLockClient` diff --git a/crates/deltalake-aws/src/lib.rs b/crates/deltalake-aws/src/lib.rs index 9e7a2f6005..dd33854456 100644 --- a/crates/deltalake-aws/src/lib.rs +++ b/crates/deltalake-aws/src/lib.rs @@ -21,8 +21,8 @@ use rusoto_core::{HttpClient, Region, RusotoError}; use rusoto_credential::AutoRefreshingProvider; use rusoto_dynamodb::{ AttributeDefinition, AttributeValue, CreateTableError, CreateTableInput, DynamoDb, - DynamoDbClient, GetItemInput, KeySchemaElement, PutItemError, PutItemInput, QueryInput, - UpdateItemError, UpdateItemInput, + DynamoDbClient, GetItemError, GetItemInput, KeySchemaElement, PutItemError, PutItemInput, + QueryError, QueryInput, UpdateItemError, UpdateItemInput, }; use rusoto_sts::WebIdentityProvider; use url::Url; @@ -119,32 +119,37 @@ impl std::fmt::Debug for DynamoDbLockClient { impl DynamoDbLockClient { /// Creates a new DynamoDbLockClient from the supplied storage options. pub fn try_new( - lock_table_name: Option<&String>, - billing_mode: Option<&String>, + lock_table_name: Option, + billing_mode: Option, + max_elapsed_request_time: Option, region: Region, use_web_identity: bool, ) -> Result { let dynamodb_client = create_dynamodb_client(region.clone(), use_web_identity)?; - let lock_table_name = lock_table_name.map_or_else( - || { - std::env::var(constants::LOCK_TABLE_KEY_NAME) - .unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned()) - }, - Clone::clone, - ); - let billing_mode: BillingMode = billing_mode.map_or_else( - || { - std::env::var(constants::BILLING_MODE_KEY_NAME).map_or_else( - |_| Ok(BillingMode::PayPerRequest), - |bm| BillingMode::from_str(&bm), - ) - }, - |bm| BillingMode::from_str(bm), - )?; + let lock_table_name = lock_table_name + .or_else(|| std::env::var(constants::LOCK_TABLE_KEY_NAME).ok()) + .unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned()); + + let billing_mode = billing_mode + .or_else(|| std::env::var(constants::BILLING_MODE_KEY_NAME).ok()) + .map_or_else( + || Ok(BillingMode::PayPerRequest), + |bm| BillingMode::from_str(&bm), + )?; + + let max_elapsed_request_time = max_elapsed_request_time + .or_else(|| std::env::var(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME).ok()) + .map_or_else( + || Ok(Duration::from_secs(60)), + |secs| u64::from_str(&secs).map(Duration::from_secs), + ) + .map_err(|err| DynamoDbConfigError::ParseMaxElapsedRequestTime { source: err })?; + let config = DynamoDbConfig { billing_mode, lock_table_name, + max_elapsed_request_time, use_web_identity, region, }; @@ -156,7 +161,7 @@ impl DynamoDbLockClient { /// Create the lock table where DynamoDb stores the commit information for all delta tables. /// - /// Transparently handles the case where that table already exists, so it's to call. + /// Transparently handles the case where that table already exists, so it's safe to call. /// After `create_table` operation is executed, the table state in DynamoDb is `creating`, and /// it's not immediately useable. This method does not wait for the table state to become /// `active`, so transient failures might occurr when immediately using the lock client. @@ -204,6 +209,10 @@ impl DynamoDbLockClient { self.config.lock_table_name.clone() } + pub fn get_dynamodb_config(&self) -> &DynamoDbConfig { + &self.config + } + fn get_primary_key(&self, version: i64, table_path: &str) -> HashMap { maplit::hashmap! { constants::ATTR_TABLE_PATH.to_owned() => string_attr(table_path), @@ -223,7 +232,19 @@ impl DynamoDbLockClient { key: self.get_primary_key(version, table_path), ..Default::default() }; - let item = self.dynamodb_client.get_item(input).await?; + let item = self + .retry(|| async { + match self.dynamodb_client.get_item(input.clone()).await { + Ok(x) => Ok(x), + Err(RusotoError::Service(GetItemError::ProvisionedThroughputExceeded(_))) => { + Err(backoff::Error::transient( + LockClientError::ProvisionedThroughputExceeded, + )) + } + Err(err) => Err(backoff::Error::permanent(err.into())), + } + }) + .await?; item.item.as_ref().map(CommitEntry::try_from).transpose() } @@ -240,22 +261,25 @@ impl DynamoDbLockClient { item, ..Default::default() }; - match self.dynamodb_client.put_item(input).await { - Ok(_) => Ok(()), - Err(RusotoError::Service(PutItemError::ConditionalCheckFailed(_))) => { - Err(LockClientError::VersionAlreadyExists { - table_path: table_path.to_owned(), - version: entry.version, - }) - } - Err(RusotoError::Service(PutItemError::ProvisionedThroughputExceeded(_))) => { - Err(LockClientError::ProvisionedThroughputExceeded) - } - Err(RusotoError::Service(PutItemError::ResourceNotFound(_))) => { - Err(LockClientError::LockTableNotFound) + self.retry(|| async { + match self.dynamodb_client.put_item(input.clone()).await { + Ok(_) => Ok(()), + Err(RusotoError::Service(PutItemError::ProvisionedThroughputExceeded(_))) => Err( + backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded), + ), + Err(RusotoError::Service(PutItemError::ConditionalCheckFailed(_))) => Err( + backoff::Error::permanent(LockClientError::VersionAlreadyExists { + table_path: table_path.to_owned(), + version: entry.version, + }), + ), + Err(RusotoError::Service(PutItemError::ResourceNotFound(_))) => Err( + backoff::Error::permanent(LockClientError::LockTableNotFound), + ), + Err(err) => Err(backoff::Error::permanent(err.into())), } - Err(err) => Err(err.into()), - } + }) + .await } /// Get the latest entry (entry with highest version). @@ -287,7 +311,18 @@ impl DynamoDbLockClient { ), ..Default::default() }; - let query_result = self.dynamodb_client.query(input).await?; + let query_result = self + .retry(|| async { + match self.dynamodb_client.query(input.clone()).await { + Ok(result) => Ok(result), + Err(RusotoError::Service(QueryError::ProvisionedThroughputExceeded(_))) => Err( + backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded), + ), + Err(err) => Err(backoff::Error::permanent(err.into())), + } + }) + .await?; + query_result .items .unwrap() @@ -320,13 +355,34 @@ impl DynamoDbLockClient { ..Default::default() }; - match self.dynamodb_client.update_item(input).await { - Ok(_) => Ok(UpdateLogEntryResult::UpdatePerformed), - Err(RusotoError::Service(UpdateItemError::ConditionalCheckFailed(_))) => { - Ok(UpdateLogEntryResult::AlreadyCompleted) + self.retry(|| async { + match self.dynamodb_client.update_item(input.clone()).await { + Ok(_) => Ok(UpdateLogEntryResult::UpdatePerformed), + Err(RusotoError::Service(UpdateItemError::ConditionalCheckFailed(_))) => { + Ok(UpdateLogEntryResult::AlreadyCompleted) + } + Err(RusotoError::Service(UpdateItemError::ProvisionedThroughputExceeded(_))) => { + Err(backoff::Error::transient( + LockClientError::ProvisionedThroughputExceeded, + )) + } + Err(err) => Err(backoff::Error::permanent(err.into())), } - Err(err) => Err(err)?, - } + }) + .await + } + + async fn retry(&self, operation: Fn) -> Result + where + Fn: FnMut() -> Fut, + Fut: std::future::Future>>, + { + let backoff = backoff::ExponentialBackoffBuilder::new() + .with_multiplier(2.) + .with_max_interval(Duration::from_secs(15)) + .with_max_elapsed_time(Some(self.config.max_elapsed_request_time)) + .build(); + backoff::future::retry(backoff, operation).await } } @@ -400,7 +456,7 @@ fn create_value_map( } #[derive(Debug, PartialEq)] -enum BillingMode { +pub enum BillingMode { PayPerRequest, Provisioned, } @@ -428,10 +484,11 @@ impl FromStr for BillingMode { #[derive(Debug, PartialEq)] pub struct DynamoDbConfig { - billing_mode: BillingMode, - lock_table_name: String, - use_web_identity: bool, - region: Region, + pub billing_mode: BillingMode, + pub lock_table_name: String, + pub max_elapsed_request_time: Duration, + pub use_web_identity: bool, + pub region: Region, } /// Represents the possible, positive outcomes of calling `DynamoDbClient::try_create_lock_table()` @@ -452,6 +509,7 @@ pub mod constants { pub const DEFAULT_LOCK_TABLE_NAME: &str = "delta_log"; pub const LOCK_TABLE_KEY_NAME: &str = "DELTA_DYNAMO_TABLE_NAME"; pub const BILLING_MODE_KEY_NAME: &str = "DELTA_DYNAMO_BILLING_MODE"; + pub const MAX_ELAPSED_REQUEST_TIME_KEY_NAME: &str = "DELTA_DYNAMO_MAX_ELAPSED_REQUEST_TIME"; pub const ATTR_TABLE_PATH: &str = "tablePath"; pub const ATTR_FILE_NAME: &str = "fileName"; diff --git a/crates/deltalake-aws/src/logstore.rs b/crates/deltalake-aws/src/logstore.rs index a9cb4154a9..8e02659d87 100644 --- a/crates/deltalake-aws/src/logstore.rs +++ b/crates/deltalake-aws/src/logstore.rs @@ -45,8 +45,18 @@ impl S3DynamoDbLogStore { object_store: ObjectStoreRef, ) -> DeltaResult { let lock_client = DynamoDbLockClient::try_new( - s3_options.extra_opts.get(constants::LOCK_TABLE_KEY_NAME), - s3_options.extra_opts.get(constants::BILLING_MODE_KEY_NAME), + s3_options + .extra_opts + .get(constants::LOCK_TABLE_KEY_NAME) + .cloned(), + s3_options + .extra_opts + .get(constants::BILLING_MODE_KEY_NAME) + .cloned(), + s3_options + .extra_opts + .get(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME) + .cloned(), s3_options.region.clone(), s3_options.use_web_identity, ) @@ -56,7 +66,6 @@ impl S3DynamoDbLogStore { source: err.into(), }, })?; - debug!("S3DynamoDbLogStore configured with lock client: {lock_client:?}"); let table_path = to_uri(&location, &Path::from("")); Ok(Self { storage: object_store, diff --git a/crates/deltalake-aws/tests/common.rs b/crates/deltalake-aws/tests/common.rs index 764c861c92..c294ca5d99 100644 --- a/crates/deltalake-aws/tests/common.rs +++ b/crates/deltalake-aws/tests/common.rs @@ -1,4 +1,5 @@ use chrono::Utc; +use deltalake_aws::constants; use deltalake_aws::register_handlers; use deltalake_aws::storage::*; use deltalake_test::utils::*; @@ -44,8 +45,8 @@ impl StorageIntegration for S3Integration { /// prepare_env fn prepare_env(&self) { - std::env::set_var( - "DELTA_DYNAMO_TABLE_NAME", + set_env_if_not_set( + constants::LOCK_TABLE_KEY_NAME, format!("delta_log_it_{}", rand::thread_rng().gen::()), ); match std::env::var(s3_constants::AWS_ENDPOINT_URL).ok() { @@ -59,9 +60,6 @@ impl StorageIntegration for S3Integration { set_env_if_not_set(s3_constants::AWS_SECRET_ACCESS_KEY, "weloverust"); set_env_if_not_set(s3_constants::AWS_REGION, "us-east-1"); set_env_if_not_set(s3_constants::AWS_S3_LOCKING_PROVIDER, "dynamodb"); - set_env_if_not_set("DYNAMO_LOCK_TABLE_NAME", "test_table"); - set_env_if_not_set("DYNAMO_LOCK_REFRESH_PERIOD_MILLIS", "100"); - set_env_if_not_set("DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS", "100"); } /// copy directory @@ -95,7 +93,7 @@ impl S3Integration { "--table-name", &table_name, "--provisioned-throughput", - "ReadCapacityUnits=10,WriteCapacityUnits=10", + "ReadCapacityUnits=1,WriteCapacityUnits=1", "--attribute-definitions", ]; let mut child = Command::new("aws") @@ -135,7 +133,7 @@ impl S3Integration { pub fn create_lock_table() -> std::io::Result { let table_name = - std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into()); + std::env::var(constants::LOCK_TABLE_KEY_NAME).unwrap_or_else(|_| "delta_log".into()); Self::create_dynamodb_table( &table_name, &[ @@ -160,7 +158,7 @@ impl S3Integration { pub fn delete_lock_table() -> std::io::Result { let table_name = - std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into()); + std::env::var(constants::LOCK_TABLE_KEY_NAME).unwrap_or_else(|_| "delta_log".into()); Self::delete_dynamodb_table(&table_name) } } diff --git a/crates/deltalake-aws/tests/integration_s3_dynamodb.rs b/crates/deltalake-aws/tests/integration_s3_dynamodb.rs index 7338ca1509..502607e868 100644 --- a/crates/deltalake-aws/tests/integration_s3_dynamodb.rs +++ b/crates/deltalake-aws/tests/integration_s3_dynamodb.rs @@ -7,7 +7,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use deltalake_aws::logstore::{RepairLogEntryResult, S3DynamoDbLogStore}; use deltalake_aws::storage::S3StorageOptions; -use deltalake_aws::{CommitEntry, DynamoDbLockClient}; +use deltalake_aws::{CommitEntry, DynamoDbConfig, DynamoDbLockClient}; use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType}; use deltalake_core::logstore::LogStore; use deltalake_core::operations::transaction::{commit, prepare_commit}; @@ -35,21 +35,46 @@ lazy_static! { } fn make_client() -> TestResult { + let options: S3StorageOptions = S3StorageOptions::default(); Ok(DynamoDbLockClient::try_new( None, None, - S3_OPTIONS.region.clone(), + None, + options.region.clone(), false, )?) } #[test] #[serial] -fn client_config_picks_up_lock_table_name() -> TestResult<()> { - let _context = IntegrationContext::new(Box::new(S3Integration::default()))?; - assert!(make_client()? - .get_lock_table_name() - .starts_with("delta_log_it_")); +fn client_configs_via_env_variables() -> TestResult<()> { + std::env::set_var( + deltalake_aws::constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME, + "64", + ); + std::env::set_var( + deltalake_aws::constants::LOCK_TABLE_KEY_NAME, + "some_table".to_owned(), + ); + std::env::set_var( + deltalake_aws::constants::BILLING_MODE_KEY_NAME, + "PAY_PER_REQUEST".to_owned(), + ); + let client = make_client()?; + let config = client.get_dynamodb_config(); + assert_eq!( + DynamoDbConfig { + billing_mode: deltalake_aws::BillingMode::PayPerRequest, + lock_table_name: "some_table".to_owned(), + max_elapsed_request_time: Duration::from_secs(64), + use_web_identity: false, + region: config.region.clone(), + }, + *config, + ); + std::env::remove_var(deltalake_aws::constants::LOCK_TABLE_KEY_NAME); + std::env::remove_var(deltalake_aws::constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME); + std::env::remove_var(deltalake_aws::constants::BILLING_MODE_KEY_NAME); Ok(()) } @@ -143,7 +168,7 @@ async fn test_repair_on_update() -> TestResult<()> { } const WORKERS: i64 = 3; -const COMMITS: i64 = 5; +const COMMITS: i64 = 15; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[serial]