diff --git a/tardis/src/os/os_client.rs b/tardis/src/os/os_client.rs index 32e68158..b4950c1a 100644 --- a/tardis/src/os/os_client.rs +++ b/tardis/src/os/os_client.rs @@ -101,6 +101,11 @@ impl TardisOSClient { self.get_client().object_delete(path, bucket_name).await } + pub async fn object_multipart_uploads(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()> { + trace!("[Tardis.OSClient] Multipart uploads object {}", path); + self.get_client().object_multipart_uploads(path, content, content_type, bucket_name).await + } + pub fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { trace!("[Tardis.OSClient] Creating object url {}", path); self.get_client().object_create_url(path, expire_sec, bucket_name) @@ -129,6 +134,8 @@ trait TardisOSOperations { async fn object_delete(&self, path: &str, bucket_name: Option<&str>) -> TardisResult<()>; + async fn object_multipart_uploads(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()>; + fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult; fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult; @@ -235,6 +242,62 @@ impl TardisOSOperations for TardisOSS3Client { } } + async fn object_multipart_uploads(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()> { + const CHUNK_SIZE: u64 = 1024 * 1024 * 5; + const MAX_CHUNKS: u64 = 10000; + + let file_size = content.len() as u64; + let mut chunk_count = (file_size / CHUNK_SIZE) + 1; + let mut size_of_last_chunk = file_size % CHUNK_SIZE; + if size_of_last_chunk == 0 { + size_of_last_chunk = CHUNK_SIZE; + chunk_count -= 1; + } + if file_size == 0 { + return Err(TardisError::custom( + "500", + &format!( + "[Tardis.OSClient] Failed to multipart uploads object {}:{} with error [Bad file size.]", + bucket_name.unwrap_or_default(), + path + ), + "-1-tardis-os-object-multipart-uploads-error", + )); + } + if chunk_count > MAX_CHUNKS { + return Err(TardisError::custom( + "500", + &format!( + "[Tardis.OSClient] Failed to multipart uploads object {}:{} with error [Too many chunks! Try increasing your chunk size.]", + bucket_name.unwrap_or_default(), + path + ), + "-1-tardis-os-object-multipart-uploads-error", + )); + } + + let bucket = self.get_bucket(bucket_name)?; + let upload_id = bucket.initiate_multipart_upload(path, content_type.unwrap_or_default()).await?.upload_id; + + let mut upload_parts = Vec::new(); + for chunk_index in 0..chunk_count { + let this_chunk = if chunk_count - 1 == chunk_index { size_of_last_chunk } else { CHUNK_SIZE }; + let upload_part_res = bucket + .put_multipart_chunk( + content[(CHUNK_SIZE * chunk_index) as usize..(CHUNK_SIZE * chunk_index + this_chunk) as usize].to_vec(), + path, + (chunk_index as u32) + 1, + &upload_id, + content_type.unwrap_or_default(), + ) + .await?; + + upload_parts.push(upload_part_res); + } + bucket.complete_multipart_upload(path, &upload_id, upload_parts).await?; + Ok(()) + } + fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { Ok(self.get_bucket(bucket_name)?.presign_put(path, expire_sec, None)?) }