From 0e813d8ca1e5517a4550bf523d7c4f62784fba85 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 25 Apr 2024 09:46:16 -0500 Subject: [PATCH 1/6] Handle 429 from GCS --- crates/core/src/operations/transaction/mod.rs | 3 ++- crates/gcp/Cargo.toml | 1 + crates/gcp/src/lib.rs | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 6606a8c339..08449c58fa 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -550,9 +550,11 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { ); match conflict_checker.check_conflicts() { Ok(_) => { + println!("Attempt {} failed: Version {} already exists", attempt_number, version); attempt_number += 1; } Err(err) => { + panic!(); this.log_store .object_store() .delete_with_retries(tmp_commit, 15) @@ -570,7 +572,6 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } } } - Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into()) }) } diff --git a/crates/gcp/Cargo.toml b/crates/gcp/Cargo.toml index daa9042c83..a0d7860ee3 100644 --- a/crates/gcp/Cargo.toml +++ b/crates/gcp/Cargo.toml @@ -25,6 +25,7 @@ thiserror = { workspace = true } tokio = { workspace = true } regex = { workspace = true } url = { workspace = true } +reqwest = "0.12.4" [dev-dependencies] chrono = { workspace = true } diff --git a/crates/gcp/src/lib.rs b/crates/gcp/src/lib.rs index 6fe040d398..ada4838ac1 100644 --- a/crates/gcp/src/lib.rs +++ b/crates/gcp/src/lib.rs @@ -13,6 +13,7 @@ use url::Url; mod config; pub mod error; +mod storage; trait GcpOptions { fn as_gcp_options(&self) -> HashMap; @@ -43,6 +44,7 @@ impl ObjectStoreFactory for GcpFactory { ) -> DeltaResult<(ObjectStoreRef, Path)> { let config = config::GcpConfigHelper::try_new(options.as_gcp_options())?.build()?; let (store, prefix) = parse_url_opts(url, config)?; + let store = crate::storage::GcsStorageBackend::try_new(Arc::new(store))?; Ok((url_prefix_handler(store, prefix.clone())?, prefix)) } } From c4b6fbbf4c9b5917a50fdabf4bc2e95e2a075475 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 25 Apr 2024 09:47:58 -0500 Subject: [PATCH 2/6] Add file --- crates/core/src/operations/transaction/mod.rs | 3 +- crates/gcp/Cargo.toml | 1 - crates/gcp/src/storage.rs | 137 ++++++++++++++++++ 3 files changed, 138 insertions(+), 3 deletions(-) create mode 100644 crates/gcp/src/storage.rs diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 08449c58fa..6606a8c339 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -550,11 +550,9 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { ); match conflict_checker.check_conflicts() { Ok(_) => { - println!("Attempt {} failed: Version {} already exists", attempt_number, version); attempt_number += 1; } Err(err) => { - panic!(); this.log_store .object_store() .delete_with_retries(tmp_commit, 15) @@ -572,6 +570,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } } } + Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into()) }) } diff --git a/crates/gcp/Cargo.toml b/crates/gcp/Cargo.toml index a0d7860ee3..daa9042c83 100644 --- a/crates/gcp/Cargo.toml +++ b/crates/gcp/Cargo.toml @@ -25,7 +25,6 @@ thiserror = { workspace = true } tokio = { workspace = true } regex = { workspace = true } url = { workspace = true } -reqwest = "0.12.4" [dev-dependencies] chrono = { workspace = true } diff --git a/crates/gcp/src/storage.rs b/crates/gcp/src/storage.rs new file mode 100644 index 0000000000..35d2f00d61 --- /dev/null +++ b/crates/gcp/src/storage.rs @@ -0,0 +1,137 @@ +//! AWS S3 storage backend. + +use bytes::Bytes; +use deltalake_core::storage::ObjectStoreRef; +use deltalake_core::Path; +use futures::stream::BoxStream; +use std::ops::Range; +use tokio::io::AsyncWrite; + +use deltalake_core::storage::object_store::{ + Result as ObjectStoreResult, PutOptions, GetOptions, ListResult, MultipartId, ObjectMeta, PutResult, GetResult, ObjectStore +}; + +pub(crate) struct GcsStorageBackend { + inner: ObjectStoreRef, +} + +impl GcsStorageBackend { + pub fn try_new(storage: ObjectStoreRef) -> ObjectStoreResult { + Ok(Self { inner: storage }) + } +} + +impl std::fmt::Debug for GcsStorageBackend { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "GcsStorageBackend") + } +} + +impl std::fmt::Display for GcsStorageBackend { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "GcsStorageBackend") + } +} + +#[async_trait::async_trait] +impl ObjectStore for GcsStorageBackend { + async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult { + self.inner.put(location, bytes).await + } + + async fn put_opts( + &self, + location: &Path, + bytes: Bytes, + options: PutOptions, + ) -> ObjectStoreResult { + self.inner.put_opts(location, bytes, options).await + } + + async fn get(&self, location: &Path) -> ObjectStoreResult { + self.inner.get(location).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + self.inner.get_opts(location, options).await + } + + async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { + self.inner.get_range(location, range).await + } + + async fn head(&self, location: &Path) -> ObjectStoreResult { + self.inner.head(location).await + } + + async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { + self.inner.delete(location).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + self.inner.copy_if_not_exists(from, to).await + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + let res = self.inner.rename_if_not_exists(from, to).await; + match res { + Ok(_) => Ok(()), + Err(e) => { + match e { + object_store::Error::Generic { store, source } => { + // If this is a 429 (rate limit) error it means more than 1 mutation operation per second + // Was attempted on this same key + // That means we're experiencing concurrency conflicts, so return a transaction error + // Source would be a reqwest error which we don't have access to so the easiest thing to do is check + // for "429" in the error message + if format!("{:?}", source).contains("429") { + Err( + object_store::Error::AlreadyExists { path: to.to_string(), source } + ) + } else { + Err( + object_store::Error::Generic { store, source } + ) + } + } + _ => Err(e) + } + }, + } + } + + async fn put_multipart( + &self, + location: &Path, + ) -> ObjectStoreResult<(MultipartId, Box)> { + self.inner.put_multipart(location).await + } + + async fn abort_multipart( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> ObjectStoreResult<()> { + self.inner.abort_multipart(location, multipart_id).await + } +} From 51fda64392780ee876ce5de9a537aececd4d6b1c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 25 Apr 2024 09:49:40 -0500 Subject: [PATCH 3/6] add debug --- crates/core/src/operations/transaction/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 6606a8c339..c29948fb92 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -550,6 +550,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { ); match conflict_checker.check_conflicts() { Ok(_) => { + println!("Attempt {} failed: Version {} already exists", attempt_number, version); attempt_number += 1; } Err(err) => { From 71627042aba31d2448677781b40d0f9c2354d7cf Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 26 Apr 2024 07:29:01 -0500 Subject: [PATCH 4/6] Update crates/gcp/src/storage.rs --- crates/gcp/src/storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/gcp/src/storage.rs b/crates/gcp/src/storage.rs index 35d2f00d61..075fe61cae 100644 --- a/crates/gcp/src/storage.rs +++ b/crates/gcp/src/storage.rs @@ -1,4 +1,4 @@ -//! AWS S3 storage backend. +//! GCP GCS storage backend. use bytes::Bytes; use deltalake_core::storage::ObjectStoreRef; From 9006471c45caf7d970585041a7e7d3ae3a946b26 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 26 Apr 2024 14:59:52 -0400 Subject: [PATCH 5/6] Update mod.rs --- crates/core/src/operations/transaction/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index c29948fb92..6606a8c339 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -550,7 +550,6 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { ); match conflict_checker.check_conflicts() { Ok(_) => { - println!("Attempt {} failed: Version {} already exists", attempt_number, version); attempt_number += 1; } Err(err) => { From ff78c638dc1b82a0b30c95e6fb6294d324e4f6fe Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 29 Apr 2024 05:16:19 +0000 Subject: [PATCH 6/6] chore: increment the version of the deltalake-gcp crate This change also loosens the meta-crate version dependency to allow more easy upgrades in the future --- crates/deltalake/Cargo.toml | 2 +- crates/gcp/Cargo.toml | 2 +- crates/gcp/src/storage.rs | 18 +++++++++--------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index ba3e26cbf9..0d5c77c8e2 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -20,7 +20,7 @@ features = ["azure", "datafusion", "gcs", "hdfs", "json", "mount", "python", "s3 deltalake-core = { version = "0.17.1", path = "../core" } deltalake-aws = { version = "0.1.0", path = "../aws", default-features = false, optional = true } deltalake-azure = { version = "0.1.0", path = "../azure", optional = true } -deltalake-gcp = { version = "0.1.0", path = "../gcp", optional = true } +deltalake-gcp = { version = "0.2", path = "../gcp", optional = true } deltalake-catalog-glue = { version = "0.1.0", path = "../catalog-glue", optional = true } deltalake-mount = { version = "0.1.0", path = "../mount", optional = true } diff --git a/crates/gcp/Cargo.toml b/crates/gcp/Cargo.toml index daa9042c83..bf94ee29be 100644 --- a/crates/gcp/Cargo.toml +++ b/crates/gcp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-gcp" -version = "0.1.0" +version = "0.2.0" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/gcp/src/storage.rs b/crates/gcp/src/storage.rs index 075fe61cae..9b938b737e 100644 --- a/crates/gcp/src/storage.rs +++ b/crates/gcp/src/storage.rs @@ -8,7 +8,8 @@ use std::ops::Range; use tokio::io::AsyncWrite; use deltalake_core::storage::object_store::{ - Result as ObjectStoreResult, PutOptions, GetOptions, ListResult, MultipartId, ObjectMeta, PutResult, GetResult, ObjectStore + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, + Result as ObjectStoreResult, }; pub(crate) struct GcsStorageBackend { @@ -105,18 +106,17 @@ impl ObjectStore for GcsStorageBackend { // Source would be a reqwest error which we don't have access to so the easiest thing to do is check // for "429" in the error message if format!("{:?}", source).contains("429") { - Err( - object_store::Error::AlreadyExists { path: to.to_string(), source } - ) + Err(object_store::Error::AlreadyExists { + path: to.to_string(), + source, + }) } else { - Err( - object_store::Error::Generic { store, source } - ) + Err(object_store::Error::Generic { store, source }) } } - _ => Err(e) + _ => Err(e), } - }, + } } }