Skip to content

Commit

Permalink
Add configurable max elapsed time
Browse files Browse the repository at this point in the history
  • Loading branch information
dispanser committed Jan 4, 2024
1 parent 62c2e15 commit 251dded
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 67 deletions.
9 changes: 9 additions & 0 deletions crates/deltalake-aws/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Errors for S3 log store backed by DynamoDb
use std::num::ParseIntError;

use rusoto_core::RusotoError;
use rusoto_dynamodb::{CreateTableError, GetItemError, PutItemError, QueryError, UpdateItemError};

Expand All @@ -24,6 +26,13 @@ pub enum DynamoDbConfigError {
/// Billing mode string invalid
#[error("Invalid billing mode : {0}, supported values : ['provided', 'pay_per_request']")]
InvalidBillingMode(String),

/// Cannot parse max_elapsed_request_time value into u64
#[error("Cannot parse max elapsed request time into u64: {source}")]
ParseMaxElapsedRequestTime {
// config_value: String,
source: ParseIntError,
},
}

/// Errors produced by `DynamoDbLockClient`
Expand Down
129 changes: 72 additions & 57 deletions crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,32 +119,37 @@ impl std::fmt::Debug for DynamoDbLockClient {
impl DynamoDbLockClient {
/// Creates a new DynamoDbLockClient from the supplied storage options.
pub fn try_new(
lock_table_name: Option<&String>,
billing_mode: Option<&String>,
lock_table_name: Option<String>,
billing_mode: Option<String>,
max_elapsed_request_time: Option<String>,
region: Region,
use_web_identity: bool,
) -> Result<Self, DynamoDbConfigError> {
let dynamodb_client = create_dynamodb_client(region.clone(), use_web_identity)?;
let lock_table_name = lock_table_name.map_or_else(
|| {
std::env::var(constants::LOCK_TABLE_KEY_NAME)
.unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned())
},
Clone::clone,
);

let billing_mode: BillingMode = billing_mode.map_or_else(
|| {
std::env::var(constants::BILLING_MODE_KEY_NAME).map_or_else(
|_| Ok(BillingMode::PayPerRequest),
|bm| BillingMode::from_str(&bm),
)
},
|bm| BillingMode::from_str(bm),
)?;
let lock_table_name = lock_table_name
.or_else(|| std::env::var(constants::LOCK_TABLE_KEY_NAME).ok())
.unwrap_or(constants::DEFAULT_LOCK_TABLE_NAME.to_owned());

let billing_mode = billing_mode
.or_else(|| std::env::var(constants::BILLING_MODE_KEY_NAME).ok())
.map_or_else(
|| Ok(BillingMode::PayPerRequest),
|bm| BillingMode::from_str(&bm),
)?;

let max_elapsed_request_time = max_elapsed_request_time
.or_else(|| std::env::var(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME).ok())
.map_or_else(
|| Ok(Duration::from_secs(60)),
|secs| u64::from_str(&secs).map(Duration::from_secs),
)
.map_err(|err| DynamoDbConfigError::ParseMaxElapsedRequestTime { source: err })?;

let config = DynamoDbConfig {
billing_mode,
lock_table_name,
max_elapsed_request_time,
use_web_identity,
region,
};
Expand Down Expand Up @@ -204,6 +209,10 @@ impl DynamoDbLockClient {
self.config.lock_table_name.clone()
}

pub fn get_dynamodb_config(&self) -> &DynamoDbConfig {
&self.config
}

fn get_primary_key(&self, version: i64, table_path: &str) -> HashMap<String, AttributeValue> {
maplit::hashmap! {
constants::ATTR_TABLE_PATH.to_owned() => string_attr(table_path),
Expand All @@ -223,16 +232,19 @@ impl DynamoDbLockClient {
key: self.get_primary_key(version, table_path),
..Default::default()
};
let item = retry(|| async {
match self.dynamodb_client.get_item(input.clone()).await {
Ok(x) => Ok(x),
Err(RusotoError::Service(GetItemError::ProvisionedThroughputExceeded(_))) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Err(err) => Err(backoff::Error::permanent(err.into())),
}
})
.await?;
let item = self
.retry(|| async {
match self.dynamodb_client.get_item(input.clone()).await {
Ok(x) => Ok(x),
Err(RusotoError::Service(GetItemError::ProvisionedThroughputExceeded(_))) => {
Err(backoff::Error::transient(
LockClientError::ProvisionedThroughputExceeded,
))
}
Err(err) => Err(backoff::Error::permanent(err.into())),
}
})
.await?;
item.item.as_ref().map(CommitEntry::try_from).transpose()
}

Expand All @@ -249,7 +261,7 @@ impl DynamoDbLockClient {
item,
..Default::default()
};
retry(|| async {
self.retry(|| async {
match self.dynamodb_client.put_item(input.clone()).await {
Ok(_) => Ok(()),
Err(RusotoError::Service(PutItemError::ProvisionedThroughputExceeded(_))) => Err(
Expand Down Expand Up @@ -299,16 +311,17 @@ impl DynamoDbLockClient {
),
..Default::default()
};
let query_result = retry(|| async {
match self.dynamodb_client.query(input.clone()).await {
Ok(result) => Ok(result),
Err(RusotoError::Service(QueryError::ProvisionedThroughputExceeded(_))) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Err(err) => Err(backoff::Error::permanent(err.into())),
}
})
.await?;
let query_result = self
.retry(|| async {
match self.dynamodb_client.query(input.clone()).await {
Ok(result) => Ok(result),
Err(RusotoError::Service(QueryError::ProvisionedThroughputExceeded(_))) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Err(err) => Err(backoff::Error::permanent(err.into())),
}
})
.await?;

query_result
.items
Expand Down Expand Up @@ -342,7 +355,7 @@ impl DynamoDbLockClient {
..Default::default()
};

retry(|| async {
self.retry(|| async {
match self.dynamodb_client.update_item(input.clone()).await {
Ok(_) => Ok(UpdateLogEntryResult::UpdatePerformed),
Err(RusotoError::Service(UpdateItemError::ConditionalCheckFailed(_))) => {
Expand All @@ -358,19 +371,19 @@ impl DynamoDbLockClient {
})
.await
}
}

async fn retry<I, E, Fn, Fut>(operation: Fn) -> Result<I, E>
where
Fn: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<I, backoff::Error<E>>>,
{
let backoff = backoff::ExponentialBackoffBuilder::new()
.with_multiplier(2.)
.with_max_interval(Duration::from_secs(15))
.with_max_elapsed_time(Some(Duration::from_secs(60)))
.build();
backoff::future::retry(backoff, operation).await
async fn retry<I, E, Fn, Fut>(&self, operation: Fn) -> Result<I, E>
where
Fn: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<I, backoff::Error<E>>>,
{
let backoff = backoff::ExponentialBackoffBuilder::new()
.with_multiplier(2.)
.with_max_interval(Duration::from_secs(15))
.with_max_elapsed_time(Some(self.config.max_elapsed_request_time))
.build();
backoff::future::retry(backoff, operation).await
}
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -443,7 +456,7 @@ fn create_value_map(
}

#[derive(Debug, PartialEq)]
enum BillingMode {
pub enum BillingMode {
PayPerRequest,
Provisioned,
}
Expand Down Expand Up @@ -471,10 +484,11 @@ impl FromStr for BillingMode {

#[derive(Debug, PartialEq)]
pub struct DynamoDbConfig {
billing_mode: BillingMode,
lock_table_name: String,
use_web_identity: bool,
region: Region,
pub billing_mode: BillingMode,
pub lock_table_name: String,
pub max_elapsed_request_time: Duration,
pub use_web_identity: bool,
pub region: Region,
}

/// Represents the possible, positive outcomes of calling `DynamoDbClient::try_create_lock_table()`
Expand All @@ -495,6 +509,7 @@ pub mod constants {
pub const DEFAULT_LOCK_TABLE_NAME: &str = "delta_log";
pub const LOCK_TABLE_KEY_NAME: &str = "DELTA_DYNAMO_TABLE_NAME";
pub const BILLING_MODE_KEY_NAME: &str = "DELTA_DYNAMO_BILLING_MODE";
pub const MAX_ELAPSED_REQUEST_TIME_KEY_NAME: &str = "DELTA_DYNAMO_MAX_ELAPSED_REQUEST_TIME";

pub const ATTR_TABLE_PATH: &str = "tablePath";
pub const ATTR_FILE_NAME: &str = "fileName";
Expand Down
15 changes: 12 additions & 3 deletions crates/deltalake-aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,18 @@ impl S3DynamoDbLogStore {
object_store: ObjectStoreRef,
) -> DeltaResult<Self> {
let lock_client = DynamoDbLockClient::try_new(
s3_options.extra_opts.get(constants::LOCK_TABLE_KEY_NAME),
s3_options.extra_opts.get(constants::BILLING_MODE_KEY_NAME),
s3_options
.extra_opts
.get(constants::LOCK_TABLE_KEY_NAME)
.cloned(),
s3_options
.extra_opts
.get(constants::BILLING_MODE_KEY_NAME)
.cloned(),
s3_options
.extra_opts
.get(constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME)
.cloned(),
s3_options.region.clone(),
s3_options.use_web_identity,
)
Expand All @@ -56,7 +66,6 @@ impl S3DynamoDbLogStore {
source: err.into(),
},
})?;
debug!("S3DynamoDbLogStore configured with lock client: {lock_client:?}");
let table_path = to_uri(&location, &Path::from(""));
Ok(Self {
storage: object_store,
Expand Down
39 changes: 32 additions & 7 deletions crates/deltalake-aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};

use deltalake_aws::logstore::{RepairLogEntryResult, S3DynamoDbLogStore};
use deltalake_aws::storage::S3StorageOptions;
use deltalake_aws::{CommitEntry, DynamoDbLockClient};
use deltalake_aws::{CommitEntry, DynamoDbLockClient, DynamoDbConfig};
use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType};
use deltalake_core::logstore::LogStore;
use deltalake_core::operations::transaction::{commit, prepare_commit};
Expand Down Expand Up @@ -35,21 +35,46 @@ lazy_static! {
}

fn make_client() -> TestResult<DynamoDbLockClient> {
let options: S3StorageOptions = S3StorageOptions::default();
Ok(DynamoDbLockClient::try_new(
None,
None,
S3_OPTIONS.region.clone(),
None,
options.region.clone(),
false,
)?)
}

#[test]
#[serial]
fn client_config_picks_up_lock_table_name() -> TestResult<()> {
let _context = IntegrationContext::new(Box::new(S3Integration::default()))?;
assert!(make_client()?
.get_lock_table_name()
.starts_with("delta_log_it_"));
fn client_configs_via_env_variables() -> TestResult<()> {
std::env::set_var(
deltalake_aws::constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME,
"64",
);
std::env::set_var(
deltalake_aws::constants::LOCK_TABLE_KEY_NAME,
"some_table".to_owned(),
);
std::env::set_var(
deltalake_aws::constants::BILLING_MODE_KEY_NAME,
"PAY_PER_REQUEST".to_owned(),
);
let client = make_client()?;
let config = client.get_dynamodb_config();
assert_eq!(
DynamoDbConfig {
billing_mode: deltalake_aws::BillingMode::PayPerRequest,
lock_table_name: "some_table".to_owned(),
max_elapsed_request_time: Duration::from_secs(64),
use_web_identity: false,
region: config.region.clone(),
},
*config,
);
std::env::remove_var(deltalake_aws::constants::LOCK_TABLE_KEY_NAME);
std::env::remove_var(deltalake_aws::constants::MAX_ELAPSED_REQUEST_TIME_KEY_NAME);
std::env::remove_var(deltalake_aws::constants::BILLING_MODE_KEY_NAME);
Ok(())
}

Expand Down

0 comments on commit 251dded

Please sign in to comment.