From 3017b3830d12a9c078a9d74930d8eacfd8e76c84 Mon Sep 17 00:00:00 2001 From: ZzIsGod1019 <1498852723@qq.com> Date: Fri, 31 May 2024 17:17:57 +0800 Subject: [PATCH 1/2] os: support upload multipart object --- tardis/src/os/os_client.rs | 59 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/tardis/src/os/os_client.rs b/tardis/src/os/os_client.rs index 32e68158..58c4d485 100644 --- a/tardis/src/os/os_client.rs +++ b/tardis/src/os/os_client.rs @@ -101,6 +101,11 @@ impl TardisOSClient { self.get_client().object_delete(path, bucket_name).await } + pub async fn object_multipart_uploads(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()> { + trace!("[Tardis.OSClient] Multipart uploads object {}", path); + self.get_client().object_multipart_uploads(path, content, content_type, bucket_name).await + } + pub fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { trace!("[Tardis.OSClient] Creating object url {}", path); self.get_client().object_create_url(path, expire_sec, bucket_name) @@ -129,6 +134,8 @@ trait TardisOSOperations { async fn object_delete(&self, path: &str, bucket_name: Option<&str>) -> TardisResult<()>; + async fn object_multipart_uploads(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()>; + fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult; fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult; @@ -235,6 +242,58 @@ impl TardisOSOperations for TardisOSS3Client { } } + async fn object_multipart_uploads(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()> { + const CHUNK_SIZE: u64 = 1024 * 1024 * 5; + const MAX_CHUNKS: u64 = 10000; + + let file_size = content.len() as u64; + let mut chunk_count = (file_size / CHUNK_SIZE) + 1; + let mut size_of_last_chunk = file_size % CHUNK_SIZE; + if size_of_last_chunk == 0 { + size_of_last_chunk = CHUNK_SIZE; + chunk_count -= 1; + } + if file_size == 0 { + return Err(TardisError::custom( + "500", + &format!( + "[Tardis.OSClient] Failed to multipart uploads object {}:{} with error [Bad file size.]", + bucket_name.unwrap_or_default(), + path + ), + "-1-tardis-os-object-multipart-uploads-error", + )); + } + if chunk_count > MAX_CHUNKS { + return Err(TardisError::custom( + "500", + &format!( + "[Tardis.OSClient] Failed to multipart uploads object {}:{} with error [Too many chunks! Try increasing your chunk size.]", + bucket_name.unwrap_or_default(), + path + ), + "-1-tardis-os-object-multipart-uploads-error", + )); + } + + let bucket = self.get_bucket(bucket_name)?; + let upload_id = bucket.initiate_multipart_upload(path, content_type.unwrap_or_default()).await?.upload_id; + + let mut upload_parts = Vec::new(); + for chunk_index in 0..chunk_count { + let this_chunk = if chunk_count - 1 == chunk_index { + size_of_last_chunk + } else { + CHUNK_SIZE + }; + let upload_part_res = bucket.put_multipart_chunk(content[(CHUNK_SIZE * chunk_index) as usize .. (CHUNK_SIZE * chunk_index + this_chunk) as usize].to_vec(), path, (chunk_index as u32) + 1, &upload_id, content_type.unwrap_or_default()).await?; + + upload_parts.push(upload_part_res); + } + bucket.complete_multipart_upload(path, &upload_id, upload_parts).await?; + Ok(()) + } + fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { Ok(self.get_bucket(bucket_name)?.presign_put(path, expire_sec, None)?) } From 8fdab595d52e5d8939bac2a8f858b0594b5220f9 Mon Sep 17 00:00:00 2001 From: ZzIsGod1019 <1498852723@qq.com> Date: Fri, 31 May 2024 17:19:13 +0800 Subject: [PATCH 2/2] fmt --- tardis/src/os/os_client.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/tardis/src/os/os_client.rs b/tardis/src/os/os_client.rs index 58c4d485..b4950c1a 100644 --- a/tardis/src/os/os_client.rs +++ b/tardis/src/os/os_client.rs @@ -281,12 +281,16 @@ impl TardisOSOperations for TardisOSS3Client { let mut upload_parts = Vec::new(); for chunk_index in 0..chunk_count { - let this_chunk = if chunk_count - 1 == chunk_index { - size_of_last_chunk - } else { - CHUNK_SIZE - }; - let upload_part_res = bucket.put_multipart_chunk(content[(CHUNK_SIZE * chunk_index) as usize .. (CHUNK_SIZE * chunk_index + this_chunk) as usize].to_vec(), path, (chunk_index as u32) + 1, &upload_id, content_type.unwrap_or_default()).await?; + let this_chunk = if chunk_count - 1 == chunk_index { size_of_last_chunk } else { CHUNK_SIZE }; + let upload_part_res = bucket + .put_multipart_chunk( + content[(CHUNK_SIZE * chunk_index) as usize..(CHUNK_SIZE * chunk_index + this_chunk) as usize].to_vec(), + path, + (chunk_index as u32) + 1, + &upload_id, + content_type.unwrap_or_default(), + ) + .await?; upload_parts.push(upload_part_res); }