From 3206e8cc480b962b0ce459a67523b6f74e8014bb Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Tue, 5 Nov 2024 01:53:10 -0500 Subject: [PATCH 1/4] Support native S3 conditional writes Add support for `PutMode::Create` and `copy_if_not_exists` on native AWS S3, which uses the underlying conditional write primitive that Amazon launched earlier this year [0]. The conditional write primitive is simpler than what's available in other S3-like products (e.g., R2), so new modes for `s3_copy_if_not_exists` and `s3_conditional_put` are added to select the native S3-specific behavior. To maintain strict backwards compatibility (e.g. with older versions of LocalStack), the new behavior is not on by default. It must be explicitly requested by the end user. The implementation for `PutMode::Create` is straightforward. The implementation of `copy_if_not_exists` is a bit more involved, as it requires managing a multipart upload that uses the UploadPartCopy operation, which was not previously supported by this crate's S3 client. To ensure test coverage, the object store workflow now runs the AWS integration tests with conditional put both disabled and enabled. Fix #6285. [0]: https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/ --- .github/workflows/object_store.yml | 6 ++ object_store/src/aws/client.rs | 85 +++++++++++++++++++++++----- object_store/src/aws/mod.rs | 64 ++++++++++++++++++--- object_store/src/aws/precondition.rs | 28 +++++++++ object_store/src/client/s3.rs | 7 +++ object_store/src/integration.rs | 6 ++ 6 files changed, 173 insertions(+), 23 deletions(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index bdbfc0bec4bb..c073e37da6f4 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -161,6 +161,12 @@ jobs: - name: Run object_store tests run: cargo test --features=aws,azure,gcp,http + - name: Run object_store tests (AWS native conditional put) + run: cargo test --features=aws + env: + AWS_CONDITIONAL_PUT: etag-create-only + AWS_COPY_IF_NOT_EXISTS: multipart + - name: GCS Output if: ${{ !cancelled() }} run: docker logs $GCS_CONTAINER diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 4b4d0b6e3b4e..2e04683e7b30 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -28,8 +28,8 @@ use crate::client::header::{get_put_result, get_version}; use crate::client::list::ListClient; use crate::client::retry::RetryExt; use crate::client::s3::{ - CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult, - ListResponse, + CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult, + InitiateMultipartUploadResult, ListResponse, }; use crate::client::GetOptionsExt; use crate::multipart::PartId; @@ -98,8 +98,11 @@ pub(crate) enum Error { #[snafu(display("Error getting create multipart response body: {}", source))] CreateMultipartResponseBody { source: reqwest::Error }, - #[snafu(display("Error performing complete multipart request: {}", source))] - CompleteMultipartRequest { source: crate::client::retry::Error }, + #[snafu(display("Error performing complete multipart request: {}: {}", path, source))] + CompleteMultipartRequest { + source: crate::client::retry::Error, + path: String, + }, #[snafu(display("Error getting complete multipart response body: {}", source))] CompleteMultipartResponseBody { source: reqwest::Error }, @@ -118,13 +121,32 @@ pub(crate) enum Error { impl From for crate::Error { fn from(err: Error) -> Self { - Self::Generic { - store: STORE, - source: Box::new(err), + match err { + Error::CompleteMultipartRequest { source, path } => source.error(STORE, path), + _ => Self::Generic { + store: STORE, + source: Box::new(err), + }, } } } +pub(crate) enum PutPartPayload<'a> { + Part(PutPayload), + Copy(&'a Path), +} + +impl Default for PutPartPayload<'_> { + fn default() -> Self { + Self::Part(PutPayload::default()) + } +} + +pub(crate) enum CompleteMultipartMode { + Overwrite, + Create, +} + #[derive(Deserialize)] #[serde(rename_all = "PascalCase", rename = "DeleteResult")] struct BatchDeleteResponse { @@ -605,15 +627,24 @@ impl S3Client { path: &Path, upload_id: &MultipartId, part_idx: usize, - data: PutPayload, + data: PutPartPayload<'_>, ) -> Result { + let is_copy = matches!(data, PutPartPayload::Copy(_)); let part = (part_idx + 1).to_string(); let mut request = self .request(Method::PUT, path) - .with_payload(data) .query(&[("partNumber", &part), ("uploadId", upload_id)]) .idempotent(true); + + request = match data { + PutPartPayload::Part(payload) => request.with_payload(payload), + PutPartPayload::Copy(path) => request.header( + "x-amz-copy-source", + &format!("{}/{}", self.config.bucket, path), + ), + }; + if self .config .encryption_headers @@ -625,7 +656,18 @@ impl S3Client { } let response = request.send().await?; - let content_id = get_etag(response.headers()).context(MetadataSnafu)?; + let content_id = match is_copy { + false => get_etag(response.headers()).context(MetadataSnafu)?, + true => { + let response = response + .bytes() + .await + .context(CreateMultipartResponseBodySnafu)?; + let response: CopyPartResult = quick_xml::de::from_reader(response.reader()) + .context(InvalidMultipartResponseSnafu)?; + response.e_tag + } + }; Ok(PartId { content_id }) } @@ -634,12 +676,18 @@ impl S3Client { location: &Path, upload_id: &str, parts: Vec, + mode: CompleteMultipartMode, ) -> Result { let parts = if parts.is_empty() { // If no parts were uploaded, upload an empty part // otherwise the completion request will fail let part = self - .put_part(location, &upload_id.to_string(), 0, PutPayload::default()) + .put_part( + location, + &upload_id.to_string(), + 0, + PutPartPayload::default(), + ) .await?; vec![part] } else { @@ -651,18 +699,27 @@ impl S3Client { let credential = self.config.get_session_credential().await?; let url = self.config.path_url(location); - let response = self + let request = self .client .request(Method::POST, url) .query(&[("uploadId", upload_id)]) .body(body) - .with_aws_sigv4(credential.authorizer(), None) + .with_aws_sigv4(credential.authorizer(), None); + + let request = match mode { + CompleteMultipartMode::Overwrite => request, + CompleteMultipartMode::Create => request.header("If-None-Match", "*"), + }; + + let response = request .retryable(&self.config.retry_config) .idempotent(true) .retry_error_body(true) .send() .await - .context(CompleteMultipartRequestSnafu)?; + .context(CompleteMultipartRequestSnafu { + path: location.as_ref(), + })?; let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?; diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index a27ed053317e..acfa05395901 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -36,7 +36,7 @@ use reqwest::{Method, StatusCode}; use std::{sync::Arc, time::Duration}; use url::Url; -use crate::aws::client::{RequestError, S3Client}; +use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3Client}; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; @@ -169,7 +169,10 @@ impl ObjectStore for AmazonS3 { match (opts.mode, &self.client.config.conditional_put) { (PutMode::Overwrite, _) => request.idempotent(true).do_put().await, (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented), - (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => { + ( + PutMode::Create, + Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagCreateOnly), + ) => { match request.header(&IF_NONE_MATCH, "*").do_put().await { // Technically If-None-Match should return NotModified but some stores, // such as R2, instead return PreconditionFailed @@ -193,6 +196,7 @@ impl ObjectStore for AmazonS3 { source: "ETag required for conditional put".to_string().into(), })?; match put { + S3ConditionalPut::ETagCreateOnly => Err(Error::NotImplemented), S3ConditionalPut::ETagMatch => { request.header(&IF_MATCH, etag.as_str()).do_put().await } @@ -293,6 +297,34 @@ impl ObjectStore for AmazonS3 { let (k, v, status) = match &self.client.config.copy_if_not_exists { Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED), Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status), + Some(S3CopyIfNotExists::Multipart) => { + let upload_id = self + .client + .create_multipart(to, PutMultipartOpts::default()) + .await?; + let part_id = self + .client + .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) + .await?; + let res = match self + .client + .complete_multipart( + to, + &upload_id, + vec![part_id], + CompleteMultipartMode::Create, + ) + .await + { + Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists { + path: to.to_string(), + source: Box::new(e), + }), + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + }; + return res; + } Some(S3CopyIfNotExists::Dynamo(lock)) => { return lock.copy_if_not_exists(&self.client, from, to).await } @@ -340,7 +372,12 @@ impl MultipartUpload for S3MultiPartUpload { Box::pin(async move { let part = state .client - .put_part(&state.location, &state.upload_id, idx, data) + .put_part( + &state.location, + &state.upload_id, + idx, + PutPartPayload::Part(data), + ) .await?; state.parts.put(idx, part); Ok(()) @@ -352,7 +389,12 @@ impl MultipartUpload for S3MultiPartUpload { self.state .client - .complete_multipart(&self.state.location, &self.state.upload_id, parts) + .complete_multipart( + &self.state.location, + &self.state.upload_id, + parts, + CompleteMultipartMode::Overwrite, + ) .await } @@ -384,7 +426,9 @@ impl MultipartStore for AmazonS3 { part_idx: usize, data: PutPayload, ) -> Result { - self.client.put_part(path, id, part_idx, data).await + self.client + .put_part(path, id, part_idx, PutPartPayload::Part(data)) + .await } async fn complete_multipart( @@ -393,7 +437,9 @@ impl MultipartStore for AmazonS3 { id: &MultipartId, parts: Vec, ) -> Result { - self.client.complete_multipart(path, id, parts).await + self.client + .complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite) + .await } async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> { @@ -427,7 +473,6 @@ mod tests { let integration = config.build().unwrap(); let config = &integration.client.config; let test_not_exists = config.copy_if_not_exists.is_some(); - let test_conditional_put = config.conditional_put.is_some(); put_get_delete_list(&integration).await; get_opts(&integration).await; @@ -458,8 +503,9 @@ mod tests { if test_not_exists { copy_if_not_exists(&integration).await; } - if test_conditional_put { - put_opts(&integration, true).await; + if let Some(conditional_put) = &config.conditional_put { + let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagCreateOnly); + put_opts(&integration, supports_update).await; } // run integration test with unsigned payload enabled diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index ad9e21537939..80f3c1a03615 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -46,6 +46,15 @@ pub enum S3CopyIfNotExists { /// /// Encoded as `header-with-status:::` ignoring whitespace HeaderWithStatus(String, String, reqwest::StatusCode), + /// Native Amazon S3 supports copy if not exists through a multipart upload + /// where the upload copies an existing object and is completed only if + /// the new object does not already exist. + /// + /// WARNING: When using this mode, `copy_if_not_exists` does not copy + /// tags or attributes from the source object. + /// + /// Encoded as `multipart` ignoring whitespace. + Multipart, /// The name of a DynamoDB table to use for coordination /// /// Encoded as either `dynamo:` or `dynamo::` @@ -64,6 +73,7 @@ impl std::fmt::Display for S3CopyIfNotExists { Self::HeaderWithStatus(k, v, code) => { write!(f, "header-with-status: {k}: {v}: {}", code.as_u16()) } + Self::Multipart => f.write_str("multipart"), Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } @@ -71,6 +81,11 @@ impl std::fmt::Display for S3CopyIfNotExists { impl S3CopyIfNotExists { fn from_str(s: &str) -> Option { + match s.trim() { + "multipart" => return Some(Self::Multipart), + _ => (), + }; + let (variant, value) = s.split_once(':')?; match variant.trim() { "header" => { @@ -118,6 +133,17 @@ pub enum S3ConditionalPut { /// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions ETagMatch, + /// Like `ETagMatch`, but with support for `PutMode::Create` and not + /// `PutMode::Option`. + /// + /// This is the limited form of conditional put supported by Amazon S3 + /// as of August 2024 ([announcement]). + /// + /// Encoded as `etag-create-only` ignoring whitespace. + /// + /// [announcement]: https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/ + ETagCreateOnly, + /// The name of a DynamoDB table to use for coordination /// /// Encoded as either `dynamo:` or `dynamo::` @@ -133,6 +159,7 @@ impl std::fmt::Display for S3ConditionalPut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::ETagMatch => write!(f, "etag"), + Self::ETagCreateOnly => write!(f, "etag-create-only"), Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } @@ -142,6 +169,7 @@ impl S3ConditionalPut { fn from_str(s: &str) -> Option { match s.trim() { "etag" => Some(Self::ETagMatch), + "etag-create-only" => Some(Self::ETagCreateOnly), trimmed => match trimmed.split_once(':')? { ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)), _ => None, diff --git a/object_store/src/client/s3.rs b/object_store/src/client/s3.rs index a9c47260e6f4..76be1116753c 100644 --- a/object_store/src/client/s3.rs +++ b/object_store/src/client/s3.rs @@ -92,6 +92,13 @@ pub(crate) struct InitiateMultipartUploadResult { pub upload_id: String, } +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct CopyPartResult { + #[serde(rename = "ETag")] + pub e_tag: String, +} + #[derive(Debug, Serialize)] #[serde(rename_all = "PascalCase")] pub(crate) struct CompleteMultipartUpload { diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 89b21bc61696..30177878306f 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -651,6 +651,12 @@ pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { assert_eq!(b.as_ref(), b"a"); if !supports_update { + let err = storage + .put_opts(&path, "c".into(), PutMode::Update(v1.clone().into()).into()) + .await + .unwrap_err(); + assert!(matches!(err, Error::NotImplemented { .. }), "{err}"); + return; } From 9cf76bc2fdabd5a165cc2bf6923d0b35dbb79916 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Tue, 5 Nov 2024 17:32:59 -0500 Subject: [PATCH 2/4] Address review feedback --- .github/workflows/object_store.yml | 2 +- object_store/src/aws/client.rs | 12 +++++- object_store/src/aws/mod.rs | 61 +++++++++++++++++----------- object_store/src/aws/precondition.rs | 27 +++++++----- 4 files changed, 65 insertions(+), 37 deletions(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index c073e37da6f4..107b4acf78c3 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -164,7 +164,7 @@ jobs: - name: Run object_store tests (AWS native conditional put) run: cargo test --features=aws env: - AWS_CONDITIONAL_PUT: etag-create-only + AWS_CONDITIONAL_PUT: etag-put-if-not-exists AWS_COPY_IF_NOT_EXISTS: multipart - name: GCS Output diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 2e04683e7b30..a610e635178d 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -641,7 +641,7 @@ impl S3Client { PutPartPayload::Part(payload) => request.with_payload(payload), PutPartPayload::Copy(path) => request.header( "x-amz-copy-source", - &format!("{}/{}", self.config.bucket, path), + &format!("{}/{}", self.config.bucket, encode_path(path)), ), }; @@ -671,6 +671,16 @@ impl S3Client { Ok(PartId { content_id }) } + pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> { + self.request(Method::DELETE, location) + .query(&[("uploadId", upload_id)]) + .with_encryption_headers() + .send() + .await?; + + Ok(()) + } + pub(crate) async fn complete_multipart( &self, location: &Path, diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index acfa05395901..b238d90eb6d7 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -171,7 +171,7 @@ impl ObjectStore for AmazonS3 { (PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented), ( PutMode::Create, - Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagCreateOnly), + Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagPutIfNotExists), ) => { match request.header(&IF_NONE_MATCH, "*").do_put().await { // Technically If-None-Match should return NotModified but some stores, @@ -196,7 +196,7 @@ impl ObjectStore for AmazonS3 { source: "ETag required for conditional put".to_string().into(), })?; match put { - S3ConditionalPut::ETagCreateOnly => Err(Error::NotImplemented), + S3ConditionalPut::ETagPutIfNotExists => Err(Error::NotImplemented), S3ConditionalPut::ETagMatch => { request.header(&IF_MATCH, etag.as_str()).do_put().await } @@ -302,27 +302,40 @@ impl ObjectStore for AmazonS3 { .client .create_multipart(to, PutMultipartOpts::default()) .await?; - let part_id = self - .client - .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) - .await?; - let res = match self - .client - .complete_multipart( - to, - &upload_id, - vec![part_id], - CompleteMultipartMode::Create, - ) - .await - { - Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists { - path: to.to_string(), - source: Box::new(e), - }), - Ok(_) => Ok(()), - Err(e) => Err(e.into()), - }; + + let res = async { + let part_id = self + .client + .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) + .await?; + match self + .client + .complete_multipart( + to, + &upload_id, + vec![part_id], + CompleteMultipartMode::Create, + ) + .await + { + Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists { + path: to.to_string(), + source: Box::new(e), + }), + Ok(_) => Ok(()), + Err(e) => Err(e), + } + } + .await; + + // If the multipart upload failed, make a best effort attempt to + // clean it up. It's the caller's responsibility to add a + // lifecycle rule if guaranteed cleanup is required, as we + // cannot protect against an ill-timed process crash. + if res.is_err() { + let _ = self.client.abort_multipart(to, &upload_id).await; + } + return res; } Some(S3CopyIfNotExists::Dynamo(lock)) => { @@ -504,7 +517,7 @@ mod tests { copy_if_not_exists(&integration).await; } if let Some(conditional_put) = &config.conditional_put { - let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagCreateOnly); + let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagPutIfNotExists); put_opts(&integration, supports_update).await; } diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index 80f3c1a03615..e5058052790d 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -47,11 +47,17 @@ pub enum S3CopyIfNotExists { /// Encoded as `header-with-status:::` ignoring whitespace HeaderWithStatus(String, String, reqwest::StatusCode), /// Native Amazon S3 supports copy if not exists through a multipart upload - /// where the upload copies an existing object and is completed only if - /// the new object does not already exist. + /// where the upload copies an existing object and is completed only if the + /// new object does not already exist. /// - /// WARNING: When using this mode, `copy_if_not_exists` does not copy - /// tags or attributes from the source object. + /// WARNING: When using this mode, `copy_if_not_exists` does not copy tags + /// or attributes from the source object. + /// + /// WARNING: When using this mode, `copy_if_not_exists` makes only a best + /// effort attempt to clean up the multipart upload if the copy operation + /// fails. Consider using a lifecycle rule to automatically clean up + /// abandoned multipart uploads. See [the module + /// docs](super#multipart-uploads) for details. /// /// Encoded as `multipart` ignoring whitespace. Multipart, @@ -81,9 +87,8 @@ impl std::fmt::Display for S3CopyIfNotExists { impl S3CopyIfNotExists { fn from_str(s: &str) -> Option { - match s.trim() { - "multipart" => return Some(Self::Multipart), - _ => (), + if s.trim() == "multipart" { + return Some(Self::Multipart); }; let (variant, value) = s.split_once(':')?; @@ -139,10 +144,10 @@ pub enum S3ConditionalPut { /// This is the limited form of conditional put supported by Amazon S3 /// as of August 2024 ([announcement]). /// - /// Encoded as `etag-create-only` ignoring whitespace. + /// Encoded as `etag-put-if-not-exists` ignoring whitespace. /// /// [announcement]: https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/ - ETagCreateOnly, + ETagPutIfNotExists, /// The name of a DynamoDB table to use for coordination /// @@ -159,7 +164,7 @@ impl std::fmt::Display for S3ConditionalPut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::ETagMatch => write!(f, "etag"), - Self::ETagCreateOnly => write!(f, "etag-create-only"), + Self::ETagPutIfNotExists => write!(f, "etag-put-if-not-exists"), Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } @@ -169,7 +174,7 @@ impl S3ConditionalPut { fn from_str(s: &str) -> Option { match s.trim() { "etag" => Some(Self::ETagMatch), - "etag-create-only" => Some(Self::ETagCreateOnly), + "etag-put-if-not-exists" => Some(Self::ETagPutIfNotExists), trimmed => match trimmed.split_once(':')? { ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)), _ => None, From a974e76b52e9431c6bcb3edf0074ede4047b0240 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Thu, 7 Nov 2024 17:35:43 -0500 Subject: [PATCH 3/4] Fix clippy failure --- object_store/src/client/s3.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/object_store/src/client/s3.rs b/object_store/src/client/s3.rs index 76be1116753c..dba752cb1251 100644 --- a/object_store/src/client/s3.rs +++ b/object_store/src/client/s3.rs @@ -92,6 +92,7 @@ pub(crate) struct InitiateMultipartUploadResult { pub upload_id: String, } +#[cfg(feature = "aws")] #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] pub(crate) struct CopyPartResult { From 8bbf64172feea632ab9613ce8dbc91f4fc7f2859 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Thu, 7 Nov 2024 17:36:02 -0500 Subject: [PATCH 4/4] Upgrade localstack in GitHub Actions To a version that supports conditional writes. --- .github/workflows/object_store.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 107b4acf78c3..1857b330326a 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -138,7 +138,7 @@ jobs: - name: Setup LocalStack (AWS emulation) run: | - echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.3.0)" >> $GITHUB_ENV + echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:3.8.1)" >> $GITHUB_ENV echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5