diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index c76dc665b583..319fd397e0f0 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -440,21 +440,27 @@ impl Access for OssBackend { } async fn stat(&self, path: &str, args: OpStat) -> Result { - let resp = self - .core - .oss_head_object(path, args.if_match(), args.if_none_match()) - .await?; + let resp = self.core.oss_head_object(path, &args).await?; let status = resp.status(); match status { - StatusCode::OK => parse_into_metadata(path, resp.headers()).map(RpStat::new), + StatusCode::OK => { + let headers = resp.headers(); + let mut meta = parse_into_metadata(path, headers)?; + + if let Some(v) = parse_header_to_str(headers, "x-oss-version-id")? { + meta.set_version(v); + } + + Ok(RpStat::new(meta)) + } _ => Err(parse_error(resp).await?), } } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.core.oss_get_object(path, args.range(), &args).await?; + let resp = self.core.oss_get_object(path, &args).await?; let status = resp.status(); @@ -486,8 +492,8 @@ impl Access for OssBackend { Ok((RpWrite::default(), w)) } - async fn delete(&self, path: &str, _: OpDelete) -> Result { - let resp = self.core.oss_delete_object(path).await?; + async fn delete(&self, path: &str, args: OpDelete) -> Result { + let resp = self.core.oss_delete_object(path, &args).await?; let status = resp.status(); match status { StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(RpDelete::default()), @@ -519,14 +525,8 @@ impl Access for OssBackend { async fn presign(&self, path: &str, args: OpPresign) -> Result { // We will not send this request out, just for signing. let mut req = match args.operation() { - PresignOperation::Stat(v) => { - self.core - .oss_head_object_request(path, true, v.if_match(), v.if_none_match())? - } - PresignOperation::Read(v) => { - self.core - .oss_get_object_request(path, BytesRange::default(), true, v)? - } + PresignOperation::Stat(v) => self.core.oss_head_object_request(path, true, v)?, + PresignOperation::Read(v) => self.core.oss_get_object_request(path, true, v)?, PresignOperation::Write(v) => { self.core .oss_put_object_request(path, None, v, Buffer::new(), true)? diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index 04b91ad74d3e..e9de4cc0ab24 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -47,6 +47,8 @@ mod constants { pub const X_OSS_SERVER_SIDE_ENCRYPTION_KEY_ID: &str = "x-oss-server-side-encryption-key-id"; pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition"; + + pub const OSS_QUERY_VERSION_ID: &str = "versionId"; } pub struct OssCore { @@ -236,12 +238,12 @@ impl OssCore { pub fn oss_get_object_request( &self, path: &str, - range: BytesRange, is_presign: bool, args: &OpRead, ) -> Result> { let p = build_abs_path(&self.root, path); let endpoint = self.get_endpoint(is_presign); + let range = args.range(); let mut url = format!("{}/{}", endpoint, percent_encode_path(&p)); // Add query arguments to the URL based on response overrides @@ -253,6 +255,13 @@ impl OssCore { percent_encode_path(override_content_disposition) )) } + if let Some(version) = args.version() { + query_args.push(format!( + "{}={}", + constants::OSS_QUERY_VERSION_ID, + percent_encode_path(version) + )) + } if !query_args.is_empty() { url.push_str(&format!("?{}", query_args.join("&"))); @@ -280,10 +289,25 @@ impl OssCore { Ok(req) } - fn oss_delete_object_request(&self, path: &str) -> Result> { + fn oss_delete_object_request(&self, path: &str, args: &OpDelete) -> Result> { let p = build_abs_path(&self.root, path); let endpoint = self.get_endpoint(false); - let url = format!("{}/{}", endpoint, percent_encode_path(&p)); + let mut url = format!("{}/{}", endpoint, percent_encode_path(&p)); + + let mut query_args = Vec::new(); + + if let Some(version) = args.version() { + query_args.push(format!( + "{}={}", + constants::OSS_QUERY_VERSION_ID, + percent_encode_path(version) + )) + } + + if !query_args.is_empty() { + url.push_str(&format!("?{}", query_args.join("&"))); + } + let req = Request::delete(&url); let req = req.body(Buffer::new()).map_err(new_request_build_error)?; @@ -295,18 +319,31 @@ impl OssCore { &self, path: &str, is_presign: bool, - if_match: Option<&str>, - if_none_match: Option<&str>, + args: &OpStat, ) -> Result> { let p = build_abs_path(&self.root, path); let endpoint = self.get_endpoint(is_presign); - let url = format!("{}/{}", endpoint, percent_encode_path(&p)); + let mut url = format!("{}/{}", endpoint, percent_encode_path(&p)); + + let mut query_args = Vec::new(); + + if let Some(version) = args.version() { + query_args.push(format!( + "{}={}", + constants::OSS_QUERY_VERSION_ID, + percent_encode_path(version) + )) + } + + if !query_args.is_empty() { + url.push_str(&format!("?{}", query_args.join("&"))); + } let mut req = Request::head(&url); - if let Some(if_match) = if_match { + if let Some(if_match) = args.if_match() { req = req.header(IF_MATCH, if_match) } - if let Some(if_none_match) = if_none_match { + if let Some(if_none_match) = args.if_none_match() { req = req.header(IF_NONE_MATCH, if_none_match); } let req = req.body(Buffer::new()).map_err(new_request_build_error)?; @@ -358,24 +395,14 @@ impl OssCore { Ok(req) } - pub async fn oss_get_object( - &self, - path: &str, - range: BytesRange, - args: &OpRead, - ) -> Result> { - let mut req = self.oss_get_object_request(path, range, false, args)?; + pub async fn oss_get_object(&self, path: &str, args: &OpRead) -> Result> { + let mut req = self.oss_get_object_request(path, false, args)?; self.sign(&mut req).await?; self.client.fetch(req).await } - pub async fn oss_head_object( - &self, - path: &str, - if_match: Option<&str>, - if_none_match: Option<&str>, - ) -> Result> { - let mut req = self.oss_head_object_request(path, false, if_match, if_none_match)?; + pub async fn oss_head_object(&self, path: &str, args: &OpStat) -> Result> { + let mut req = self.oss_head_object_request(path, false, args)?; self.sign(&mut req).await?; self.send(req).await @@ -431,8 +458,8 @@ impl OssCore { self.send(req).await } - pub async fn oss_delete_object(&self, path: &str) -> Result> { - let mut req = self.oss_delete_object_request(path)?; + pub async fn oss_delete_object(&self, path: &str, args: &OpDelete) -> Result> { + let mut req = self.oss_delete_object_request(path, args)?; self.sign(&mut req).await?; self.send(req).await } diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index 224db8e4c0f7..599617d3018b 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -164,7 +164,10 @@ impl oio::MultipartWrite for OssWriter { impl oio::AppendWrite for OssWriter { async fn offset(&self) -> Result { - let resp = self.core.oss_head_object(&self.path, None, None).await?; + let resp = self + .core + .oss_head_object(&self.path, &OpStat::new()) + .await?; let status = resp.status(); match status { diff --git a/core/src/types/read/reader.rs b/core/src/types/read/reader.rs index 89505919e867..641bb9eaaebf 100644 --- a/core/src/types/read/reader.rs +++ b/core/src/types/read/reader.rs @@ -130,10 +130,16 @@ impl Reader { Bound::Unbounded => match self.size.load() { Some(v) => v, None => { + let mut op_stat = OpStat::new(); + + if let Some(v) = self.ctx.args().version() { + op_stat = op_stat.with_version(v); + } + let size = self .ctx .accessor() - .stat(self.ctx.path(), OpStat::new()) + .stat(self.ctx.path(), op_stat) .await? .into_metadata() .content_length();