From d78f9ad793b9b599c71ebaf7b5550bf877ac1b02 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 14 Jun 2024 16:29:31 +0800 Subject: [PATCH 1/4] fix(s3): parse MultipartUploadResponse to check error in body Signed-off-by: Ruihang Xia --- core/src/services/s3/core.rs | 56 ++++++++++++++++++++++++++++++++++ core/src/services/s3/writer.rs | 11 ++++++- 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 708554e4510f..1f5ba7842df1 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -765,6 +765,22 @@ pub struct CompleteMultipartUploadRequest { pub part: Vec, } +/// Result of MultipartUploadRequest. +/// +/// Result part on success are ignored. +#[derive(Default, Debug, Deserialize, Eq, PartialEq)] +#[serde(default, rename_all = "PascalCase")] +pub struct CompleteMultipartUploadResult { + pub error: Vec, +} + +#[derive(Default, Debug, Deserialize, PartialEq, Eq)] +#[serde(default, rename_all = "PascalCase")] +pub struct CompleteMultipartUploadResultError { + pub code: String, + pub message: String, +} + #[derive(Clone, Default, Debug, Serialize)] #[serde(default, rename_all = "PascalCase")] pub struct CompleteMultipartUploadRequestPart { @@ -971,6 +987,46 @@ mod tests { ) } + /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Example_4 + #[test] + fn test_deserialize_complete_multipart_upload_response() { + let bs = Bytes::from( + r#" + + InternalError + We encountered an internal error. Please try again. + 656c76696e6727732072657175657374 + Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg== + "#, + ); + + let expected = CompleteMultipartUploadResult { + error: vec![CompleteMultipartUploadResultError { + code: "InternalError".to_string(), + message: "We encountered an internal error. Please try again.".to_string(), + }], + }; + + let actual: CompleteMultipartUploadResult = + quick_xml::de::from_reader(bs.reader()).expect("must success"); + assert_eq!(actual, expected); + + // suceess case are ignored + let bs = Bytes::from( + r#" + + http://Example-Bucket.s3.amazonaws.com/Example-Object + example-bucket + example-object + "3858f62230ac3c915f300c664312c11f-9" + "#, + ); + let expected = CompleteMultipartUploadResult::default(); + let actual: CompleteMultipartUploadResult = + quick_xml::de::from_reader(bs.reader()).expect("must success"); + assert_eq!(actual, expected); + } + /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples #[test] fn test_serialize_delete_objects_request() { diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 3866404442aa..85b4ba337ef9 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -158,7 +158,16 @@ impl oio::MultipartWrite for S3Writer { let status = resp.status(); match status { - StatusCode::OK => Ok(()), + StatusCode::OK => { + let (_, mut body) = resp.into_parts(); + let bs = body.copy_to_bytes(body.remaining()); + let result: CompleteMultipartUploadResult = + quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; + if !result.error.is_empty() { + return Err(Error::new(ErrorKind::Unexpected, &result.error[0].message)); + } + Ok(()) + } _ => Err(parse_error(resp).await?), } } From fe67aa93ae49332b8ee8f91ea977315809abc689 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 14 Jun 2024 17:02:46 +0800 Subject: [PATCH 2/4] move error to root layer Signed-off-by: Ruihang Xia --- core/src/services/s3/core.rs | 14 +++----------- core/src/services/s3/writer.rs | 4 ++-- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 1f5ba7842df1..629ce477adfa 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -769,14 +769,8 @@ pub struct CompleteMultipartUploadRequest { /// /// Result part on success are ignored. #[derive(Default, Debug, Deserialize, Eq, PartialEq)] -#[serde(default, rename_all = "PascalCase")] +#[serde(default, rename = "Error", rename_all = "PascalCase")] pub struct CompleteMultipartUploadResult { - pub error: Vec, -} - -#[derive(Default, Debug, Deserialize, PartialEq, Eq)] -#[serde(default, rename_all = "PascalCase")] -pub struct CompleteMultipartUploadResultError { pub code: String, pub message: String, } @@ -1001,10 +995,8 @@ mod tests { ); let expected = CompleteMultipartUploadResult { - error: vec![CompleteMultipartUploadResultError { - code: "InternalError".to_string(), - message: "We encountered an internal error. Please try again.".to_string(), - }], + code: "InternalError".to_string(), + message: "We encountered an internal error. Please try again.".to_string(), }; let actual: CompleteMultipartUploadResult = diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 85b4ba337ef9..cb7d143a2226 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -163,8 +163,8 @@ impl oio::MultipartWrite for S3Writer { let bs = body.copy_to_bytes(body.remaining()); let result: CompleteMultipartUploadResult = quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; - if !result.error.is_empty() { - return Err(Error::new(ErrorKind::Unexpected, &result.error[0].message)); + if !result.code.is_empty() { + return Err(Error::new(ErrorKind::Unexpected, &result.message)); } Ok(()) } From 3bad36762f174afa3c50b186c59087532a54824e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 14 Jun 2024 19:23:59 +0800 Subject: [PATCH 3/4] reduce one clone Signed-off-by: Ruihang Xia --- core/src/services/s3/writer.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index cb7d143a2226..9b676ccf83d2 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -159,10 +159,9 @@ impl oio::MultipartWrite for S3Writer { match status { StatusCode::OK => { - let (_, mut body) = resp.into_parts(); - let bs = body.copy_to_bytes(body.remaining()); let result: CompleteMultipartUploadResult = - quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; + quick_xml::de::from_reader(resp.into_body().reader()) + .map_err(new_xml_deserialize_error)?; if !result.code.is_empty() { return Err(Error::new(ErrorKind::Unexpected, &result.message)); } From 1bc1e55c24274998afeba18e9c2539233b92aec4 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 14 Jun 2024 23:04:37 +0800 Subject: [PATCH 4/4] refactor to use S3Error Signed-off-by: Ruihang Xia --- core/src/services/s3/core.rs | 48 ---------------------------------- core/src/services/s3/error.rs | 46 +++++++++++++++++++++++++++----- core/src/services/s3/writer.rs | 15 ++++++----- 3 files changed, 49 insertions(+), 60 deletions(-) diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 629ce477adfa..708554e4510f 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -765,16 +765,6 @@ pub struct CompleteMultipartUploadRequest { pub part: Vec, } -/// Result of MultipartUploadRequest. -/// -/// Result part on success are ignored. -#[derive(Default, Debug, Deserialize, Eq, PartialEq)] -#[serde(default, rename = "Error", rename_all = "PascalCase")] -pub struct CompleteMultipartUploadResult { - pub code: String, - pub message: String, -} - #[derive(Clone, Default, Debug, Serialize)] #[serde(default, rename_all = "PascalCase")] pub struct CompleteMultipartUploadRequestPart { @@ -981,44 +971,6 @@ mod tests { ) } - /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Example_4 - #[test] - fn test_deserialize_complete_multipart_upload_response() { - let bs = Bytes::from( - r#" - - InternalError - We encountered an internal error. Please try again. - 656c76696e6727732072657175657374 - Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg== - "#, - ); - - let expected = CompleteMultipartUploadResult { - code: "InternalError".to_string(), - message: "We encountered an internal error. Please try again.".to_string(), - }; - - let actual: CompleteMultipartUploadResult = - quick_xml::de::from_reader(bs.reader()).expect("must success"); - assert_eq!(actual, expected); - - // suceess case are ignored - let bs = Bytes::from( - r#" - - http://Example-Bucket.s3.amazonaws.com/Example-Object - example-bucket - example-object - "3858f62230ac3c915f300c664312c11f-9" - "#, - ); - let expected = CompleteMultipartUploadResult::default(); - let actual: CompleteMultipartUploadResult = - quick_xml::de::from_reader(bs.reader()).expect("must success"); - assert_eq!(actual, expected); - } - /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples #[test] fn test_serialize_delete_objects_request() { diff --git a/core/src/services/s3/error.rs b/core/src/services/s3/error.rs index 079789ef0124..6602ddc39626 100644 --- a/core/src/services/s3/error.rs +++ b/core/src/services/s3/error.rs @@ -16,6 +16,7 @@ // under the License. use bytes::Buf; +use http::response::Parts; use http::Response; use quick_xml::de; use serde::Deserialize; @@ -24,13 +25,13 @@ use crate::raw::*; use crate::*; /// S3Error is the error returned by s3 service. -#[derive(Default, Debug, Deserialize)] +#[derive(Default, Debug, Deserialize, PartialEq, Eq)] #[serde(default, rename_all = "PascalCase")] -struct S3Error { - code: String, - message: String, - resource: String, - request_id: String, +pub(crate) struct S3Error { + pub code: String, + pub message: String, + pub resource: String, + pub request_id: String, } /// Parse error response into Error. @@ -68,6 +69,21 @@ pub async fn parse_error(resp: Response) -> Result { Ok(err) } +/// Util function to build [`Error`] from a [`S3Error`] object. +pub(crate) fn from_s3_error(s3_error: S3Error, parts: Parts) -> Error { + let (kind, retryable) = + parse_s3_error_code(s3_error.code.as_str()).unwrap_or((ErrorKind::Unexpected, false)); + let mut err = Error::new(kind, &format!("{s3_error:?}")); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + err +} + /// Returns the `Error kind` of this code and whether the error is retryable. /// All possible error code: pub fn parse_s3_error_code(code: &str) -> Option<(ErrorKind, bool)> { @@ -123,4 +139,22 @@ mod tests { assert_eq!(out.resource, "/mybucket/myfoto.jpg"); assert_eq!(out.request_id, "4442587FB7D0A2F9"); } + + #[test] + fn test_parse_error_from_unrelated_input() { + let bs = bytes::Bytes::from( + r#" + + + http://Example-Bucket.s3.ap-southeast-1.amazonaws.com/Example-Object + Example-Bucket + Example-Object + "3858f62230ac3c915f300c664312c11f-9" + +"#, + ); + + let out: S3Error = de::from_reader(bs.reader()).expect("must success"); + assert_eq!(out, S3Error::default()); + } } diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 9b676ccf83d2..86c42f5683e1 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -21,7 +21,7 @@ use bytes::Buf; use http::StatusCode; use super::core::*; -use super::error::parse_error; +use super::error::{from_s3_error, parse_error, S3Error}; use crate::raw::*; use crate::*; @@ -159,12 +159,15 @@ impl oio::MultipartWrite for S3Writer { match status { StatusCode::OK => { - let result: CompleteMultipartUploadResult = - quick_xml::de::from_reader(resp.into_body().reader()) - .map_err(new_xml_deserialize_error)?; - if !result.code.is_empty() { - return Err(Error::new(ErrorKind::Unexpected, &result.message)); + // still check if there is any error because S3 might return error for status code 200 + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Example_4 + let (parts, body) = resp.into_parts(); + let maybe_error: S3Error = + quick_xml::de::from_reader(body.reader()).map_err(new_xml_deserialize_error)?; + if !maybe_error.code.is_empty() { + return Err(from_s3_error(maybe_error, parts)); } + Ok(()) } _ => Err(parse_error(resp).await?),