Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: retry with exponential backoff for DynamoDb interaction #1975

Merged
merged 2 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/deltalake-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ tokio = { workspace = true }
regex = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
url = { workspace = true }
backoff = { version = "0.4", features = [ "tokio" ] }

[dev-dependencies]
chrono = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-aws/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn setup_s3_context() -> TestContext {
config.insert("AWS_ACCESS_KEY_ID".to_owned(), "deltalake".to_owned());
config.insert("AWS_SECRET_ACCESS_KEY".to_owned(), "weloverust".to_owned());
config.insert("AWS_S3_LOCKING_PROVIDER".to_owned(), "dynamodb".to_owned());
config.insert("DYNAMO_LOCK_TABLE_NAME".to_owned(), lock_table.clone());
config.insert(constants::LOCK_TABLE_KEY_NAME.to_owned(), lock_table.clone());
config.insert("AWS_ALLOW_HTTP".to_owned(), "TRUE".to_string());

TestContext {
Expand Down
9 changes: 9 additions & 0 deletions crates/deltalake-aws/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Errors for S3 log store backed by DynamoDb
use std::num::ParseIntError;

use rusoto_core::RusotoError;
use rusoto_dynamodb::{CreateTableError, GetItemError, PutItemError, QueryError, UpdateItemError};

Expand All @@ -24,6 +26,13 @@ pub enum DynamoDbConfigError {
/// Billing mode string invalid
#[error("Invalid billing mode : {0}, supported values : ['provided', 'pay_per_request']")]
InvalidBillingMode(String),

/// Cannot parse max_elapsed_request_time value into u64
#[error("Cannot parse max elapsed request time into u64: {source}")]
ParseMaxElapsedRequestTime {
// config_value: String,
source: ParseIntError,
},
}

/// Errors produced by `DynamoDbLockClient`
Expand Down
156 changes: 107 additions & 49 deletions crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use rusoto_core::{HttpClient, Region, RusotoError};
use rusoto_credential::AutoRefreshingProvider;
use rusoto_dynamodb::{
AttributeDefinition, AttributeValue, CreateTableError, CreateTableInput, DynamoDb,
DynamoDbClient, GetItemInput, KeySchemaElement, PutItemError, PutItemInput, QueryInput,
UpdateItemError, UpdateItemInput,
DynamoDbClient, GetItemError, GetItemInput, KeySchemaElement, PutItemError, PutItemInput,
QueryError, QueryInput, UpdateItemError, UpdateItemInput,
};
use rusoto_sts::WebIdentityProvider;
use url::Url;
Expand Down Expand Up @@ -119,32 +119,37 @@ impl std::fmt::Debug for DynamoDbLockClient {
impl DynamoDbLockClient {
/// Creates a new DynamoDbLockClient from the supplied storage options.
pub fn try_new(
lock_table_name: Option<&String>,
billing_mode: Option<&String>,
lock_table_name: Option<String>,
billing_mode: Option<String>,
max_elapsed_request_time: Option<String>,
region: Region,
use_web_identity: bool,
) -> Result<Self, DynamoDbConfigError> {
let dynamodb_client = create_dynamodb_client(region.clone(), use_web_identity)?;
let lock_table_name = lock_table_name.map_or_else(
|| {
std::env::var(constants::LOCK_TABLE_KEY_NAME)
.unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned())
},
Clone::clone,
);

let billing_mode: BillingMode = billing_mode.map_or_else(
|| {
std::env::var(constants::BILLING_MODE_KEY_NAME).map_or_else(
|_| Ok(BillingMode::PayPerRequest),
|bm| BillingMode::from_str(&bm),
)
},
|bm| BillingMode::from_str(bm),
)?;
let lock_table_name = lock_table_name
.or_else(|| std::env::var(constants::LOCK_TABLE_KEY_NAME).ok())
.unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned());

let billing_mode = billing_mode
.or_else(|| std::env::var(constants::BILLING_MODE_KEY_NAME).ok())
.map_or_else(
|| Ok(BillingMode::PayPerRequest),
|bm| BillingMode::from_str(&bm),
)?;

let max_elapsed_request_time = max_elapsed_request_time
.or_else(|| std::env::var(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME).ok())
.map_or_else(
|| Ok(Duration::from_secs(60)),
|secs| u64::from_str(&secs).map(Duration::from_secs),
)
.map_err(|err| DynamoDbConfigError::ParseMaxElapsedRequestTime { source: err })?;

let config = DynamoDbConfig {
billing_mode,
lock_table_name,
max_elapsed_request_time,
use_web_identity,
region,
};
Expand All @@ -156,7 +161,7 @@ impl DynamoDbLockClient {

/// Create the lock table where DynamoDb stores the commit information for all delta tables.
///
/// Transparently handles the case where that table already exists, so it's to call.
/// Transparently handles the case where that table already exists, so it's safe to call.
/// After `create_table` operation is executed, the table state in DynamoDb is `creating`, and
/// it's not immediately useable. This method does not wait for the table state to become
/// `active`, so transient failures might occurr when immediately using the lock client.
Expand Down Expand Up @@ -204,6 +209,10 @@ impl DynamoDbLockClient {
self.config.lock_table_name.clone()
}

pub fn get_dynamodb_config(&self) -> &DynamoDbConfig {
&self.config
}

fn get_primary_key(&self, version: i64, table_path: &str) -> HashMap<String, AttributeValue> {
maplit::hashmap! {
constants::ATTR_TABLE_PATH.to_owned() => string_attr(table_path),
Expand All @@ -223,7 +232,19 @@ impl DynamoDbLockClient {
key: self.get_primary_key(version, table_path),
..Default::default()
};
let item = self.dynamodb_client.get_item(input).await?;
let item = self
.retry(|| async {
match self.dynamodb_client.get_item(input.clone()).await {
Ok(x) => Ok(x),
Err(RusotoError::Service(GetItemError::ProvisionedThroughputExceeded(_))) => {
Err(backoff::Error::transient(
LockClientError::ProvisionedThroughputExceeded,
))
}
Err(err) => Err(backoff::Error::permanent(err.into())),
}
})
.await?;
item.item.as_ref().map(CommitEntry::try_from).transpose()
}

Expand All @@ -240,22 +261,25 @@ impl DynamoDbLockClient {
item,
..Default::default()
};
match self.dynamodb_client.put_item(input).await {
Ok(_) => Ok(()),
Err(RusotoError::Service(PutItemError::ConditionalCheckFailed(_))) => {
Err(LockClientError::VersionAlreadyExists {
table_path: table_path.to_owned(),
version: entry.version,
})
}
Err(RusotoError::Service(PutItemError::ProvisionedThroughputExceeded(_))) => {
Err(LockClientError::ProvisionedThroughputExceeded)
}
Err(RusotoError::Service(PutItemError::ResourceNotFound(_))) => {
Err(LockClientError::LockTableNotFound)
self.retry(|| async {
match self.dynamodb_client.put_item(input.clone()).await {
Ok(_) => Ok(()),
Err(RusotoError::Service(PutItemError::ProvisionedThroughputExceeded(_))) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Err(RusotoError::Service(PutItemError::ConditionalCheckFailed(_))) => Err(
backoff::Error::permanent(LockClientError::VersionAlreadyExists {
table_path: table_path.to_owned(),
version: entry.version,
}),
),
Err(RusotoError::Service(PutItemError::ResourceNotFound(_))) => Err(
backoff::Error::permanent(LockClientError::LockTableNotFound),
),
Err(err) => Err(backoff::Error::permanent(err.into())),
}
Err(err) => Err(err.into()),
}
})
.await
}

/// Get the latest entry (entry with highest version).
Expand Down Expand Up @@ -287,7 +311,18 @@ impl DynamoDbLockClient {
),
..Default::default()
};
let query_result = self.dynamodb_client.query(input).await?;
let query_result = self
.retry(|| async {
match self.dynamodb_client.query(input.clone()).await {
Ok(result) => Ok(result),
Err(RusotoError::Service(QueryError::ProvisionedThroughputExceeded(_))) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Err(err) => Err(backoff::Error::permanent(err.into())),
}
})
.await?;

query_result
.items
.unwrap()
Expand Down Expand Up @@ -320,13 +355,34 @@ impl DynamoDbLockClient {
..Default::default()
};

match self.dynamodb_client.update_item(input).await {
Ok(_) => Ok(UpdateLogEntryResult::UpdatePerformed),
Err(RusotoError::Service(UpdateItemError::ConditionalCheckFailed(_))) => {
Ok(UpdateLogEntryResult::AlreadyCompleted)
self.retry(|| async {
match self.dynamodb_client.update_item(input.clone()).await {
Ok(_) => Ok(UpdateLogEntryResult::UpdatePerformed),
Err(RusotoError::Service(UpdateItemError::ConditionalCheckFailed(_))) => {
Ok(UpdateLogEntryResult::AlreadyCompleted)
}
Err(RusotoError::Service(UpdateItemError::ProvisionedThroughputExceeded(_))) => {
Err(backoff::Error::transient(
LockClientError::ProvisionedThroughputExceeded,
))
}
Err(err) => Err(backoff::Error::permanent(err.into())),
}
Err(err) => Err(err)?,
}
})
.await
}

async fn retry<I, E, Fn, Fut>(&self, operation: Fn) -> Result<I, E>
where
Fn: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<I, backoff::Error<E>>>,
{
let backoff = backoff::ExponentialBackoffBuilder::new()
.with_multiplier(2.)
.with_max_interval(Duration::from_secs(15))
.with_max_elapsed_time(Some(self.config.max_elapsed_request_time))
.build();
backoff::future::retry(backoff, operation).await
}
}

Expand Down Expand Up @@ -400,7 +456,7 @@ fn create_value_map(
}

#[derive(Debug, PartialEq)]
enum BillingMode {
pub enum BillingMode {
PayPerRequest,
Provisioned,
}
Expand Down Expand Up @@ -428,10 +484,11 @@ impl FromStr for BillingMode {

#[derive(Debug, PartialEq)]
pub struct DynamoDbConfig {
billing_mode: BillingMode,
lock_table_name: String,
use_web_identity: bool,
region: Region,
pub billing_mode: BillingMode,
pub lock_table_name: String,
pub max_elapsed_request_time: Duration,
pub use_web_identity: bool,
pub region: Region,
}

/// Represents the possible, positive outcomes of calling `DynamoDbClient::try_create_lock_table()`
Expand All @@ -452,6 +509,7 @@ pub mod constants {
pub const DEFAULT_LOCK_TABLE_NAME: &str = "delta_log";
pub const LOCK_TABLE_KEY_NAME: &str = "DELTA_DYNAMO_TABLE_NAME";
pub const BILLING_MODE_KEY_NAME: &str = "DELTA_DYNAMO_BILLING_MODE";
pub const MAX_ELAPSED_REQUEST_TIME_KEY_NAME: &str = "DELTA_DYNAMO_MAX_ELAPSED_REQUEST_TIME";

pub const ATTR_TABLE_PATH: &str = "tablePath";
pub const ATTR_FILE_NAME: &str = "fileName";
Expand Down
15 changes: 12 additions & 3 deletions crates/deltalake-aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,18 @@ impl S3DynamoDbLogStore {
object_store: ObjectStoreRef,
) -> DeltaResult<Self> {
let lock_client = DynamoDbLockClient::try_new(
s3_options.extra_opts.get(constants::LOCK_TABLE_KEY_NAME),
s3_options.extra_opts.get(constants::BILLING_MODE_KEY_NAME),
s3_options
.extra_opts
.get(constants::LOCK_TABLE_KEY_NAME)
.cloned(),
s3_options
.extra_opts
.get(constants::BILLING_MODE_KEY_NAME)
.cloned(),
s3_options
.extra_opts
.get(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME)
.cloned(),
s3_options.region.clone(),
s3_options.use_web_identity,
)
Expand All @@ -56,7 +66,6 @@ impl S3DynamoDbLogStore {
source: err.into(),
},
})?;
debug!("S3DynamoDbLogStore configured with lock client: {lock_client:?}");
let table_path = to_uri(&location, &Path::from(""));
Ok(Self {
storage: object_store,
Expand Down
14 changes: 6 additions & 8 deletions crates/deltalake-aws/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chrono::Utc;
use deltalake_aws::constants;
use deltalake_aws::register_handlers;
use deltalake_aws::storage::*;
use deltalake_test::utils::*;
Expand Down Expand Up @@ -44,8 +45,8 @@ impl StorageIntegration for S3Integration {

/// prepare_env
fn prepare_env(&self) {
std::env::set_var(
"DELTA_DYNAMO_TABLE_NAME",
set_env_if_not_set(
constants::LOCK_TABLE_KEY_NAME,
format!("delta_log_it_{}", rand::thread_rng().gen::<u16>()),
);
match std::env::var(s3_constants::AWS_ENDPOINT_URL).ok() {
Expand All @@ -59,9 +60,6 @@ impl StorageIntegration for S3Integration {
set_env_if_not_set(s3_constants::AWS_SECRET_ACCESS_KEY, "weloverust");
set_env_if_not_set(s3_constants::AWS_REGION, "us-east-1");
set_env_if_not_set(s3_constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
set_env_if_not_set("DYNAMO_LOCK_TABLE_NAME", "test_table");
set_env_if_not_set("DYNAMO_LOCK_REFRESH_PERIOD_MILLIS", "100");
set_env_if_not_set("DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS", "100");
}

/// copy directory
Expand Down Expand Up @@ -95,7 +93,7 @@ impl S3Integration {
"--table-name",
&table_name,
"--provisioned-throughput",
"ReadCapacityUnits=10,WriteCapacityUnits=10",
"ReadCapacityUnits=1,WriteCapacityUnits=1",
"--attribute-definitions",
];
let mut child = Command::new("aws")
Expand Down Expand Up @@ -135,7 +133,7 @@ impl S3Integration {

pub fn create_lock_table() -> std::io::Result<ExitStatus> {
let table_name =
std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into());
std::env::var(constants::LOCK_TABLE_KEY_NAME).unwrap_or_else(|_| "delta_log".into());
Self::create_dynamodb_table(
&table_name,
&[
Expand All @@ -160,7 +158,7 @@ impl S3Integration {

pub fn delete_lock_table() -> std::io::Result<ExitStatus> {
let table_name =
std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into());
std::env::var(constants::LOCK_TABLE_KEY_NAME).unwrap_or_else(|_| "delta_log".into());
Self::delete_dynamodb_table(&table_name)
}
}
Expand Down
Loading
Loading