Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object: support multipart upload #124

Merged
merged 2 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tardis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ lettre = { version = "0.11", features = [
], optional = true }

# Object Storage
rust-s3 = { version = "0.33", optional = true }
# rust-s3 = { version = "0.33", optional = true }
rust-s3 = { git = "https://github.com/tuist/rust-s3.git", rev = "b80b231" , optional = true }
anyhow = { version = "1.0", optional = true }

# K8s
Expand Down
90 changes: 71 additions & 19 deletions tardis/src/os/os_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::HashMap;
use std::ops::Deref;

use async_trait::async_trait;
use s3::creds::Credentials;
use s3::serde_types::Part;
use s3::{Bucket, BucketConfiguration, Region};
use tracing::{error, info, trace};

Expand Down Expand Up @@ -106,19 +108,34 @@ impl TardisOSClient {
self.get_client().object_multipart_uploads(path, content, content_type, bucket_name).await
}

pub fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
pub async fn initiate_multipart_upload(&self, path: &str, content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<String> {
trace!("[Tardis.OSClient] Initiate multipart upload {}", path);
self.get_client().initiate_multipart_upload(path, content_type, bucket_name).await
}

pub async fn batch_build_create_presign_url(&self, path: &str, upload_id: &str, part_number: u32, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<Vec<String>> {
trace!("[Tardis.OSClient] Batch build create presign url {}", path);
self.get_client().batch_build_create_presign_url(path, upload_id, part_number, expire_sec, bucket_name).await
}

pub async fn complete_multipart_upload(&self, path: &str, upload_id: &str, parts: Vec<String>, bucket_name: Option<&str>) -> TardisResult<()> {
trace!("[Tardis.OSClient] Complete multipart upload {}", path);
self.get_client().complete_multipart_upload(path, upload_id, parts, bucket_name).await
}

pub async fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
trace!("[Tardis.OSClient] Creating object url {}", path);
self.get_client().object_create_url(path, expire_sec, bucket_name)
self.get_client().object_create_url(path, expire_sec, bucket_name).await
}

pub fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
pub async fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
trace!("[Tardis.OSClient] Getting object url {}", path);
self.get_client().object_get_url(path, expire_sec, bucket_name)
self.get_client().object_get_url(path, expire_sec, bucket_name).await
}

pub fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
pub async fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
trace!("[Tardis.OSClient] Deleting object url {}", path);
self.get_client().object_delete_url(path, expire_sec, bucket_name)
self.get_client().object_delete_url(path, expire_sec, bucket_name).await
}
}

Expand All @@ -136,11 +153,17 @@ trait TardisOSOperations {

async fn object_multipart_uploads(&self, path: &str, content: &[u8], content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<()>;

fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String>;
async fn initiate_multipart_upload(&self, path: &str, content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<String>;

async fn batch_build_create_presign_url(&self, path: &str, upload_id: &str, part_number: u32, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<Vec<String>>;

async fn complete_multipart_upload(&self, path: &str, upload_id: &str, parts: Vec<String>, bucket_name: Option<&str>) -> TardisResult<()>;

fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String>;
async fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String>;

fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String>;
async fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String>;

async fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String>;
}

#[async_trait]
Expand Down Expand Up @@ -277,18 +300,18 @@ impl TardisOSOperations for TardisOSS3Client {
}

let bucket = self.get_bucket(bucket_name)?;
let upload_id = bucket.initiate_multipart_upload(path, content_type.unwrap_or_default()).await?.upload_id;
let upload_id = self.initiate_multipart_upload(path, content_type, bucket_name).await?;

let mut upload_parts = Vec::new();
for chunk_index in 0..chunk_count {
let this_chunk = if chunk_count - 1 == chunk_index { size_of_last_chunk } else { CHUNK_SIZE };
let upload_part_res = bucket
let upload_part_res: Part = bucket
.put_multipart_chunk(
content[(CHUNK_SIZE * chunk_index) as usize..(CHUNK_SIZE * chunk_index + this_chunk) as usize].to_vec(),
path,
(chunk_index as u32) + 1,
&upload_id,
content_type.unwrap_or_default(),
content_type.unwrap_or("application/octet-stream"),
)
.await?;

Expand All @@ -298,16 +321,45 @@ impl TardisOSOperations for TardisOSS3Client {
Ok(())
}

fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
Ok(self.get_bucket(bucket_name)?.presign_put(path, expire_sec, None)?)
async fn initiate_multipart_upload(&self, path: &str, content_type: Option<&str>, bucket_name: Option<&str>) -> TardisResult<String> {
Ok(self.get_bucket(bucket_name)?.initiate_multipart_upload(path, content_type.unwrap_or("application/octet-stream")).await?.upload_id)
}

async fn batch_build_create_presign_url(&self, path: &str, upload_id: &str, part_number: u32, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<Vec<String>> {
let mut presign_url = vec![];
for part_no in 0..part_number {
let mut custom_queries = HashMap::new();
custom_queries.insert("upload_id".to_string(), upload_id.to_string());
custom_queries.insert("part_number".to_string(), (part_no + 1).to_string());
presign_url.push(self.get_bucket(bucket_name)?.presign_put(path, expire_sec, None, Some(custom_queries)).await?);
}
Ok(presign_url)
}

async fn complete_multipart_upload(&self, path: &str, upload_id: &str, parts: Vec<String>, bucket_name: Option<&str>) -> TardisResult<()> {
let bucket = self.get_bucket(bucket_name)?;
let mut part_params = vec![];
for (i, etag) in parts.into_iter().enumerate() {
part_params.push(Part {
part_number: (i + 1) as u32,
etag,
});
}
bucket.complete_multipart_upload(path, upload_id, part_params).await?;

Ok(())
}

async fn object_create_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
Ok(self.get_bucket(bucket_name)?.presign_put(path, expire_sec, None, None).await?)
}

fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
Ok(self.get_bucket(bucket_name)?.presign_get(path, expire_sec, None)?)
async fn object_get_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
Ok(self.get_bucket(bucket_name)?.presign_get(path, expire_sec, None).await?)
}

fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
Ok(self.get_bucket(bucket_name)?.presign_delete(path, expire_sec)?)
async fn object_delete_url(&self, path: &str, expire_sec: u32, bucket_name: Option<&str>) -> TardisResult<String> {
Ok(self.get_bucket(bucket_name)?.presign_delete(path, expire_sec).await?)
}
}

Expand All @@ -327,7 +379,7 @@ impl From<s3::error::S3Error> for TardisError {
fn from(error: s3::error::S3Error) -> Self {
error!("[Tardis.OSClient] Error: {}", error.to_string());
match error {
s3::error::S3Error::Http(http_code, msg) => TardisError::custom(&format!("{http_code}"), &format!("[Tardis.OSClient] Error: {msg}"), "-1-tardis-os-error"),
s3::error::S3Error::Http(e) => TardisError::custom(ERROR_DEFAULT_CODE, &format!("[Tardis.OSClient] Error: {}", e), "-1-tardis-os-error"),
_ => TardisError::custom(ERROR_DEFAULT_CODE, &format!("[Tardis.OSClient] Error: {error:?}"), "-1-tardis-os-error"),
}
}
Expand Down
4 changes: 2 additions & 2 deletions tardis/tests/test_os_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ async fn test_os_client() -> TardisResult<()> {

TardisFuns::os().bucket_create_simple(bucket_name, true).await?;
let resp = TardisFuns::os().bucket_create_simple(bucket_name, true).await;
assert_eq!(resp.err().unwrap().code, "409");
assert_eq!(resp.err().unwrap().code, "-1");

TardisFuns::os().object_create("test/test.txt", "I want to go to S3 测试".as_bytes(), None, Some(bucket_name)).await?;

let data = TardisFuns::os().object_get("test/test.txt", Some(bucket_name)).await?;
assert_eq!(String::from_utf8(data).unwrap(), "I want to go to S3 测试");

info!("object_get_url = {}", TardisFuns::os().object_get_url("test/test.txt", 60, Some(bucket_name))?);
info!("object_get_url = {}", TardisFuns::os().object_get_url("test/test.txt", 60, Some(bucket_name)).await?);

//info!("object_create_url = {}", TardisFuns::os().object_create_url("test/test2.txt", 1, Some(bucket_name.clone()))?);
//
Expand Down
Loading