Skip to content

Commit

Permalink
Refactor AWS specific code into the deltalake-aws crate
Browse files Browse the repository at this point in the history
There are a number of changes here to untangle the coupling inside of
deltalake-core to allow deltalake-aws to be separated properly
  • Loading branch information
rtyler committed Jan 2, 2024
1 parent 40e3b0d commit 241db34
Show file tree
Hide file tree
Showing 542 changed files with 2,008 additions and 4,641 deletions.
22 changes: 0 additions & 22 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,25 +144,3 @@ jobs:
- name: Run tests with native-tls
run: |
cargo test --no-default-features --features integration_test,s3-native-tls,datafusion
parquet2_test:
runs-on: ubuntu-latest
env:
RUSTFLAGS: "-C debuginfo=line-tables-only"
CARGO_INCREMENTAL: 0

steps:
- uses: actions/checkout@v3

- name: Install minimal stable with clippy and rustfmt
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: stable
override: true

- uses: Swatinem/rust-cache@v2

- name: Run tests
working-directory: crates/deltalake-core
run: cargo test --no-default-features --features=parquet2
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ serde_json = "1"
# "stdlib"
bytes = { version = "1" }
chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
log = { version = "0.4" }
regex = { version = "1" }
thiserror = { version = "1" }
url = { version = "2" }
Expand Down
26 changes: 18 additions & 8 deletions crates/deltalake-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,43 @@ name = "deltalake-aws"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
deltalake-core = { path = "../deltalake-core" }
rusoto_core = { version = "0.47", default-features = false, optional = true }
rusoto_credential = { version = "0.47", optional = true }
rusoto_credential = { version = "0.47" }
rusoto_sts = { version = "0.47", default-features = false, optional = true }
rusoto_dynamodb = { version = "0.47", default-features = false, optional = true }
object_store = "0.7"
object_store = { version = "0.7.1", features = ["aws"]}
lazy_static = "1"
maplit = "1"
async-trait = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
regex = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
url = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
serial_test = "2"
deltalake-test = { path = "../deltalake-test" }
pretty_env_logger = "*"
rand = "0.8"
serde_json = { workspace = true }

[features]
default = ["rustls"]
integration_test = []
native-tls = [
"rusoto_core/native-tls",
"rusoto_credential",
"rusoto_sts/native-tls",
"rusoto_dynamodb/native-tls",
"object_store/aws",
]
rustls = [
"rusoto_core/rustls",
"rusoto_credential",
"rusoto_sts/rustls",
"rusoto_dynamodb/rustls",
"object_store/aws",
]
File renamed without changes.
72 changes: 70 additions & 2 deletions crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
//! Lock client implementation based on DynamoDb.

pub mod errors;
pub mod logstore;
pub mod storage;

use lazy_static::lazy_static;
use log::*;
use regex::Regex;
use std::{
collections::HashMap,
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
};

use object_store::path::Path;
use deltalake_core::logstore::{logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{factories, url_prefix_handler, ObjectStoreRef, StorageOptions};
use deltalake_core::{DeltaResult, Path};
use rusoto_core::{HttpClient, Region, RusotoError};
use rusoto_credential::AutoRefreshingProvider;
use rusoto_dynamodb::{
Expand All @@ -19,8 +25,48 @@ use rusoto_dynamodb::{
UpdateItemError, UpdateItemInput,
};
use rusoto_sts::WebIdentityProvider;
use url::Url;

use errors::{DynamoDbConfigError, LockClientError};
use storage::{S3ObjectStoreFactory, S3StorageOptions};

#[derive(Clone, Debug, Default)]
struct S3LogStoreFactory {}

impl LogStoreFactory for S3LogStoreFactory {
fn with_options(
&self,
store: ObjectStoreRef,
location: &Url,
options: &StorageOptions,
) -> DeltaResult<Arc<dyn LogStore>> {
let store = url_prefix_handler(store, Path::parse(location.path())?)?;
let s3_options = S3StorageOptions::from_map(&options.0);

if s3_options.locking_provider.as_deref() != Some("dynamodb") {
debug!("S3LogStoreFactory has been asked to create a LogStore without the dynamodb locking provider");
return Ok(deltalake_core::logstore::default_logstore(
store, location, options,
));
}

Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new(
location.clone(),
options.clone(),
&s3_options,
store,
)?))
}
}

/// Register an [ObjectStoreFactory] for common S3 [Url] schemes
pub fn register_handlers(_additional_prefixes: Option<Url>) {
for scheme in ["s3", "s3a"].iter() {
let url = Url::parse(&format!("{}://", scheme)).unwrap();
factories().insert(url.clone(), Arc::new(S3ObjectStoreFactory::default()));
logstores().insert(url.clone(), Arc::new(S3LogStoreFactory::default()));
}
}

/// Representation of a log entry stored in DynamoDb
/// dynamo db item consists of:
Expand Down Expand Up @@ -62,6 +108,12 @@ pub struct DynamoDbLockClient {
config: DynamoDbConfig,
}

impl std::fmt::Debug for DynamoDbLockClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(f, "DynamoDbLockClient(config: {:?})", self.config)
}
}

impl DynamoDbLockClient {
/// Creates a new DynamoDbLockClient from the supplied storage options.
pub fn try_new(
Expand Down Expand Up @@ -514,8 +566,9 @@ fn extract_version_from_filename(name: &str) -> Option<i64> {

#[cfg(test)]
mod tests {

use super::*;
use object_store::memory::InMemory;
use serial_test::serial;

fn commit_entry_roundtrip(c: &CommitEntry) -> Result<(), LockClientError> {
let item_data: HashMap<String, AttributeValue> = create_value_map(c, "some_table");
Expand Down Expand Up @@ -547,4 +600,19 @@ mod tests {
})?;
Ok(())
}

/// In cases where there is no dynamodb specified locking provider, this should get a default
/// logstore
#[test]
#[serial]
fn test_logstore_factory_default() {
let factory = S3LogStoreFactory::default();
let store = InMemory::new();
let url = Url::parse("s3://test-bucket").unwrap();
std::env::remove_var(storage::s3_constants::AWS_S3_LOCKING_PROVIDER);
let logstore = factory
.with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
.unwrap();
assert_eq!(logstore.name(), "DefaultLogStore");
}
}
Loading

0 comments on commit 241db34

Please sign in to comment.