diff --git a/tardis/Cargo.toml b/tardis/Cargo.toml index 8d2446d..8d65948 100644 --- a/tardis/Cargo.toml +++ b/tardis/Cargo.toml @@ -217,7 +217,8 @@ lettre = { version = "0.11", features = [ ], optional = true } # Object Storage -rust-s3 = { version = "0.33", optional = true } +# rust-s3 = { version = "0.33", optional = true } +rust-s3 = { git = "https://github.com/tuist/rust-s3.git", rev = "b80b231" , optional = true } anyhow = { version = "1.0", optional = true } # K8s diff --git a/tardis/src/os/os_client.rs b/tardis/src/os/os_client.rs index b4950c1..338cade 100644 --- a/tardis/src/os/os_client.rs +++ b/tardis/src/os/os_client.rs @@ -1,7 +1,9 @@ +use std::collections::HashMap; use std::ops::Deref; use async_trait::async_trait; use s3::creds::Credentials; +use s3::serde_types::Part; use s3::{Bucket, BucketConfiguration, Region}; use tracing::{error, info, trace}; @@ -106,19 +108,34 @@ impl TardisOSClient { self.get_client().object_multipart_uploads(path, content, content_type, bucket_name).await } - pub fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { + pub async fn initiate_multipart_upload(&self, path: &str, content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult { + trace!("[Tardis.OSClient] Initiate multipart upload {}", path); + self.get_client().initiate_multipart_upload(path, content_type, bucket_name).await + } + + pub async fn batch_build_create_presign_url(&self, path: &str, upload_id: &str, part_number: u32, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult> { + trace!("[Tardis.OSClient] Batch build create presign url {}", path); + self.get_client().batch_build_create_presign_url(path, upload_id, part_number, expire_sec, bucket_name).await + } + + pub async fn complete_multipart_upload(&self, path: &str, upload_id: &str, parts: Vec, bucket_name: Option<&str>) -> TardisResult<()> { + trace!("[Tardis.OSClient] Complete multipart upload {}", path); + self.get_client().complete_multipart_upload(path, upload_id, parts, bucket_name).await + } + + pub async fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { trace!("[Tardis.OSClient] Creating object url {}", path); - self.get_client().object_create_url(path, expire_sec, bucket_name) + self.get_client().object_create_url(path, expire_sec, bucket_name).await } - pub fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { + pub async fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { trace!("[Tardis.OSClient] Getting object url {}", path); - self.get_client().object_get_url(path, expire_sec, bucket_name) + self.get_client().object_get_url(path, expire_sec, bucket_name).await } - pub fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { + pub async fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { trace!("[Tardis.OSClient] Deleting object url {}", path); - self.get_client().object_delete_url(path, expire_sec, bucket_name) + self.get_client().object_delete_url(path, expire_sec, bucket_name).await } } @@ -136,11 +153,17 @@ trait TardisOSOperations { async fn object_multipart_uploads(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()>; - fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult; + async fn initiate_multipart_upload(&self, path: &str, content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult; + + async fn batch_build_create_presign_url(&self, path: &str, upload_id: &str, part_number: u32, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult>; + + async fn complete_multipart_upload(&self, path: &str, upload_id: &str, parts: Vec, bucket_name: Option<&str>) -> TardisResult<()>; - fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult; + async fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult; - fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult; + async fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult; + + async fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult; } #[async_trait] @@ -277,18 +300,18 @@ impl TardisOSOperations for TardisOSS3Client { } let bucket = self.get_bucket(bucket_name)?; - let upload_id = bucket.initiate_multipart_upload(path, content_type.unwrap_or_default()).await?.upload_id; + let upload_id = self.initiate_multipart_upload(path, content_type, bucket_name).await?; let mut upload_parts = Vec::new(); for chunk_index in 0..chunk_count { let this_chunk = if chunk_count - 1 == chunk_index { size_of_last_chunk } else { CHUNK_SIZE }; - let upload_part_res = bucket + let upload_part_res: Part = bucket .put_multipart_chunk( content[(CHUNK_SIZE * chunk_index) as usize..(CHUNK_SIZE * chunk_index + this_chunk) as usize].to_vec(), path, (chunk_index as u32) + 1, &upload_id, - content_type.unwrap_or_default(), + content_type.unwrap_or("application/octet-stream"), ) .await?; @@ -298,16 +321,45 @@ impl TardisOSOperations for TardisOSS3Client { Ok(()) } - fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { - Ok(self.get_bucket(bucket_name)?.presign_put(path, expire_sec, None)?) + async fn initiate_multipart_upload(&self, path: &str, content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult { + Ok(self.get_bucket(bucket_name)?.initiate_multipart_upload(path, content_type.unwrap_or("application/octet-stream")).await?.upload_id) + } + + async fn batch_build_create_presign_url(&self, path: &str, upload_id: &str, part_number: u32, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult> { + let mut presign_url = vec![]; + for part_no in 0..part_number { + let mut custom_queries = HashMap::new(); + custom_queries.insert("upload_id".to_string(), upload_id.to_string()); + custom_queries.insert("part_number".to_string(), (part_no + 1).to_string()); + presign_url.push(self.get_bucket(bucket_name)?.presign_put(path, expire_sec, None, Some(custom_queries)).await?); + } + Ok(presign_url) + } + + async fn complete_multipart_upload(&self, path: &str, upload_id: &str, parts: Vec, bucket_name: Option<&str>) -> TardisResult<()> { + let bucket = self.get_bucket(bucket_name)?; + let mut part_params = vec![]; + for (i, etag) in parts.into_iter().enumerate() { + part_params.push(Part { + part_number: (i + 1) as u32, + etag, + }); + } + bucket.complete_multipart_upload(path, upload_id, part_params).await?; + + Ok(()) + } + + async fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { + Ok(self.get_bucket(bucket_name)?.presign_put(path, expire_sec, None, None).await?) } - fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { - Ok(self.get_bucket(bucket_name)?.presign_get(path, expire_sec, None)?) + async fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { + Ok(self.get_bucket(bucket_name)?.presign_get(path, expire_sec, None).await?) } - fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { - Ok(self.get_bucket(bucket_name)?.presign_delete(path, expire_sec)?) + async fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult { + Ok(self.get_bucket(bucket_name)?.presign_delete(path, expire_sec).await?) } } @@ -327,7 +379,7 @@ impl From for TardisError { fn from(error: s3::error::S3Error) -> Self { error!("[Tardis.OSClient] Error: {}", error.to_string()); match error { - s3::error::S3Error::Http(http_code, msg) => TardisError::custom(&format!("{http_code}"), &format!("[Tardis.OSClient] Error: {msg}"), "-1-tardis-os-error"), + s3::error::S3Error::Http(e) => TardisError::custom(ERROR_DEFAULT_CODE, &format!("[Tardis.OSClient] Error: {}", e), "-1-tardis-os-error"), _ => TardisError::custom(ERROR_DEFAULT_CODE, &format!("[Tardis.OSClient] Error: {error:?}"), "-1-tardis-os-error"), } } diff --git a/tardis/tests/test_os_client.rs b/tardis/tests/test_os_client.rs index d601e8a..183f667 100644 --- a/tardis/tests/test_os_client.rs +++ b/tardis/tests/test_os_client.rs @@ -18,14 +18,14 @@ async fn test_os_client() -> TardisResult<()> { TardisFuns::os().bucket_create_simple(bucket_name, true).await?; let resp = TardisFuns::os().bucket_create_simple(bucket_name, true).await; - assert_eq!(resp.err().unwrap().code, "409"); + assert_eq!(resp.err().unwrap().code, "-1"); TardisFuns::os().object_create("test/test.txt", "I want to go to S3 测试".as_bytes(), None, Some(bucket_name)).await?; let data = TardisFuns::os().object_get("test/test.txt", Some(bucket_name)).await?; assert_eq!(String::from_utf8(data).unwrap(), "I want to go to S3 测试"); - info!("object_get_url = {}", TardisFuns::os().object_get_url("test/test.txt", 60, Some(bucket_name))?); + info!("object_get_url = {}", TardisFuns::os().object_get_url("test/test.txt", 60, Some(bucket_name)).await?); //info!("object_create_url = {}", TardisFuns::os().object_create_url("test/test2.txt", 1, Some(bucket_name.clone()))?); //