Skip to content

Commit

Permalink
Merge pull request #123 from ZzIsGod1019/zz-os-0601
Browse files Browse the repository at this point in the history
os: support upload multipart object
  • Loading branch information
4t145 authored May 31, 2024
2 parents 9dffda9 + 8fdab59 commit 163bf0d
Showing 1 changed file with 63 additions and 0 deletions.
63 changes: 63 additions & 0 deletions tardis/src/os/os_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ impl TardisOSClient {
self.get_client().object_delete(path, bucket_name).await
}

pub async fn object_multipart_uploads(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()> {
trace!("[Tardis.OSClient] Multipart uploads object {}", path);
self.get_client().object_multipart_uploads(path, content, content_type, bucket_name).await
}

pub fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
trace!("[Tardis.OSClient] Creating object url {}", path);
self.get_client().object_create_url(path, expire_sec, bucket_name)
Expand Down Expand Up @@ -129,6 +134,8 @@ trait TardisOSOperations {

async fn object_delete(&self, path: &str, bucket_name: Option<&str>) -> TardisResult<()>;

async fn object_multipart_uploads(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()>;

fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String>;

fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String>;
Expand Down Expand Up @@ -235,6 +242,62 @@ impl TardisOSOperations for TardisOSS3Client {
}
}

async fn object_multipart_uploads(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()> {
const CHUNK_SIZE: u64 = 1024 * 1024 * 5;
const MAX_CHUNKS: u64 = 10000;

let file_size = content.len() as u64;
let mut chunk_count = (file_size / CHUNK_SIZE) + 1;
let mut size_of_last_chunk = file_size % CHUNK_SIZE;
if size_of_last_chunk == 0 {
size_of_last_chunk = CHUNK_SIZE;
chunk_count -= 1;
}
if file_size == 0 {
return Err(TardisError::custom(
"500",
&format!(
"[Tardis.OSClient] Failed to multipart uploads object {}:{} with error [Bad file size.]",
bucket_name.unwrap_or_default(),
path
),
"-1-tardis-os-object-multipart-uploads-error",
));
}
if chunk_count > MAX_CHUNKS {
return Err(TardisError::custom(
"500",
&format!(
"[Tardis.OSClient] Failed to multipart uploads object {}:{} with error [Too many chunks! Try increasing your chunk size.]",
bucket_name.unwrap_or_default(),
path
),
"-1-tardis-os-object-multipart-uploads-error",
));
}

let bucket = self.get_bucket(bucket_name)?;
let upload_id = bucket.initiate_multipart_upload(path, content_type.unwrap_or_default()).await?.upload_id;

let mut upload_parts = Vec::new();
for chunk_index in 0..chunk_count {
let this_chunk = if chunk_count - 1 == chunk_index { size_of_last_chunk } else { CHUNK_SIZE };
let upload_part_res = bucket
.put_multipart_chunk(
content[(CHUNK_SIZE * chunk_index) as usize..(CHUNK_SIZE * chunk_index + this_chunk) as usize].to_vec(),
path,
(chunk_index as u32) + 1,
&upload_id,
content_type.unwrap_or_default(),
)
.await?;

upload_parts.push(upload_part_res);
}
bucket.complete_multipart_upload(path, &upload_id, upload_parts).await?;
Ok(())
}

fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
Ok(self.get_bucket(bucket_name)?.presign_put(path, expire_sec, None)?)
}
Expand Down

0 comments on commit 163bf0d

Please sign in to comment.