Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(aliyun-drive): rewrite writer part #4744

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ services-vercel-blob = []
services-webdav = []
services-webhdfs = []
services-yandex-disk = []
services-aliyun-drive = ["dep:sha1"]
services-aliyun-drive = []

[lib]
bench = false
Expand Down
17 changes: 0 additions & 17 deletions core/src/services/aliyun_drive/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ pub struct AliyunDriveConfig {
///
/// Fallback to default if not set or no other drives can be found.
pub drive_type: String,
/// rapid_upload of this backend.
///
/// Skip uploading files that are already in the drive by hashing their content.
///
/// Only works under the write_once operation.
pub rapid_upload: bool,
}

impl Debug for AliyunDriveConfig {
Expand Down Expand Up @@ -160,13 +154,6 @@ impl AliyunDriveBuilder {
self
}

/// Set rapid_upload of this backend.
pub fn rapid_upload(&mut self, rapid_upload: bool) -> &mut Self {
self.config.rapid_upload = rapid_upload;

self
}

/// Specify the http client that used by this service.
///
/// # Notes
Expand Down Expand Up @@ -243,15 +230,11 @@ impl Builder for AliyunDriveBuilder {
};
debug!("backend use drive_type {:?}", drive_type);

let rapid_upload = self.config.rapid_upload;
debug!("backend use rapid_upload {}", rapid_upload);

Ok(AliyunDriveBackend {
core: Arc::new(AliyunDriveCore {
endpoint: "https://openapi.alipan.com".to_string(),
root,
drive_type,
rapid_upload,
signer: Arc::new(Mutex::new(AliyunDriveSigner {
drive_id: None,
sign,
Expand Down
3 changes: 1 addition & 2 deletions core/src/services/aliyun_drive/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ pub struct AliyunDriveCore {
pub endpoint: String,
pub root: String,
pub drive_type: DriveType,
pub rapid_upload: bool,

pub signer: Arc<Mutex<AliyunDriveSigner>>,
pub client: HttpClient,
Expand Down Expand Up @@ -475,8 +474,8 @@ pub struct UploadUrlResponse {
pub struct CreateResponse {
pub file_id: String,
pub upload_id: Option<String>,
pub rapid_upload: Option<bool>,
pub part_info_list: Option<Vec<PartInfo>>,
pub exist: Option<bool>,
}

#[derive(Serialize, Deserialize)]
Expand Down
5 changes: 0 additions & 5 deletions core/src/services/aliyun_drive/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ This service can be used to:
- `client_secret`: Set the client_secret for backend.
- `refresh_token`: Set the refresh_token for backend.
- `drive_type`: Set the drive_type for backend.
- `rapid_upload`: Set the rapid_upload for backend.

Refer to [`AliyunDriveBuilder`]`s public API docs for more information.

Expand Down Expand Up @@ -54,10 +53,6 @@ async fn main() -> Result<()> {
//
// Fallback to the default type if no other types found.
builder.drive_type("resource");
// Set the rapid_upload.
//
// Works only under the write_once operation for now.
builder.rapid_upload(true);

let op: Operator = Operator::new(builder)?.finish();

Expand Down
1 change: 1 addition & 0 deletions core/src/services/aliyun_drive/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub async fn parse_error(res: Response<Buffer>) -> Result<Error> {
Some(code) if code == "PreHashMatched" => (ErrorKind::IsSameFile, false),
_ => (ErrorKind::Unexpected, false),
},
409 => (ErrorKind::AlreadyExists, false),
429 => match code {
Some(code) if code == "TooManyRequests" => (ErrorKind::RateLimited, true),
_ => (ErrorKind::Unexpected, false),
Expand Down
160 changes: 45 additions & 115 deletions core/src/services/aliyun_drive/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use base64::engine::general_purpose;
use base64::Engine;
use super::core::{AliyunDriveCore, UploadUrlResponse};
use crate::{
raw::*,
services::aliyun_drive::core::{CheckNameMode, CreateResponse, CreateType},
*,
};
use bytes::Buf;
use md5::Digest;
use md5::Md5;
use sha1::Sha1;
use std::sync::Arc;
use tokio::sync::RwLock;

use super::core::AliyunDriveCore;
use super::core::RapidUpload;
use super::core::UploadUrlResponse;
use crate::raw::*;
use crate::services::aliyun_drive::core::CheckNameMode;
use crate::services::aliyun_drive::core::CreateResponse;
use crate::services::aliyun_drive::core::CreateType;
use crate::*;

pub type AliyunDriveWriters = oio::MultipartWriter<AliyunDriveWriter>;

pub struct AliyunDriveWriter {
Expand Down Expand Up @@ -72,91 +63,39 @@ impl AliyunDriveWriter {
Ok(file_id.clone())
}

async fn get_rapid_upload(
&self,
size: Option<u64>,
body: Option<crate::Buffer>,
pre_hash: bool,
) -> Result<Option<RapidUpload>> {
let Some(size) = size else {
return Ok(None);
};
let Some(body) = body else {
return Ok(None);
};
if pre_hash && size > 1024 * 100 {
return Ok(Some(RapidUpload {
pre_hash: Some(format!(
"{:x}",
Sha1::new_with_prefix(body.slice(0..1024).to_vec()).finalize()
)),
content_hash: None,
proof_code: None,
}));
}
let (token, _) = self.core.get_token_and_drive().await?;
let Ok(index) = u64::from_str_radix(
&format!("{:x}", Md5::new_with_prefix(token.unwrap()).finalize())[0..16],
16,
) else {
return Err(Error::new(
ErrorKind::Unexpected,
"cannot parse hexadecimal",
));
};
let size = size as usize;
let index = index as usize % size;
let (range_start, range_end) = if index + 8 > size {
(index, size)
} else {
(index, index + 8)
};

Ok(Some(RapidUpload {
pre_hash: None,
content_hash: Some(format!(
"{:x}",
Sha1::new_with_prefix(body.to_vec()).finalize()
)),
proof_code: Some(
general_purpose::STANDARD.encode(body.to_bytes().slice(range_start..range_end)),
),
}))
}

async fn write(
&self,
size: Option<u64>,
body: Option<crate::Buffer>,
pre_hash: bool,
body: Option<Buffer>,
upload_url: Option<&str>,
) -> Result<(bool, Option<String>)> {
) -> Result<Option<String>> {
if let Some(upload_url) = upload_url {
let Some(body) = body else {
return Err(Error::new(
ErrorKind::Unexpected,
"cannot upload without body",
));
};
self.core.upload(upload_url, body).await?;
return Ok((false, None));
if let Err(err) = self.core.upload(upload_url, body).await {
if err.kind() != ErrorKind::AlreadyExists {
return Err(err);
}
};
return Ok(None);
}

let res = self
.core
.create_with_rapid_upload(
.create(
Some(&self.parent_file_id),
&self.name,
CreateType::File,
CheckNameMode::Refuse,
size,
self.get_rapid_upload(size, body.clone(), pre_hash).await?,
)
.await;

let res = match res {
Err(err) if err.kind() == ErrorKind::IsSameFile => {
return Ok((true, None));
return Ok(None);
}
Err(err) => {
return Err(err);
Expand All @@ -167,59 +106,53 @@ impl AliyunDriveWriter {
let output: CreateResponse =
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
self.write_file_id(output.file_id).await;
if output.exist.is_some_and(|x| x) {
return Err(Error::new(ErrorKind::AlreadyExists, "file exists"));
}

if output.upload_id.is_some() && output.rapid_upload.is_some_and(|x| !x) {
if output.upload_id.is_some() {
if let Some(body) = body {
let Some(part_info_list) = output.part_info_list else {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
};
if part_info_list.is_empty() {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
}
self.core
.upload(&part_info_list[0].upload_url, body)
.await?;
if let Err(err) = self.core.upload(&part_info_list[0].upload_url, body).await {
if err.kind() != ErrorKind::AlreadyExists {
return Err(err);
}
}
}
}

Ok((false, output.upload_id))
Ok(output.upload_id)
}
}

impl oio::MultipartWrite for AliyunDriveWriter {
async fn write_once(&self, size: u64, body: crate::Buffer) -> Result<()> {
let upload_id = if self.core.rapid_upload {
let (rapid, mut upload_id) = self
.write(Some(size), Some(body.clone()), true, None)
.await?;

let size = if rapid { Some(size) } else { None };
let (_, new_upload_id) = self.write(size, Some(body), false, None).await?;
async fn complete(&self, upload_id: &str) -> Result<Buffer> {
let file_id = self.read_file_id().await?;

if new_upload_id.is_some() {
upload_id = new_upload_id;
}
self.core.complete(&file_id, upload_id).await
}

let Some(upload_id) = upload_id else {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_id"));
};
upload_id
} else {
let upload_id = self.initiate_part().await?;
self.write_part(&upload_id, 0, size, body).await?;
upload_id
};
async fn delete(&self) -> Result<()> {
let file_id = self.read_file_id().await?;

self.core.complete(&file_id, &upload_id).await?;
self.core.delete_path(&file_id).await
}
}

impl oio::MultipartWrite for AliyunDriveWriter {
async fn write_once(&self, size: u64, body: crate::Buffer) -> Result<()> {
let upload_id = self.initiate_part().await?;
self.write_part(&upload_id, 0, size, body).await?;

self.complete(&upload_id).await?;
Ok(())
}

async fn initiate_part(&self) -> Result<String> {
let (_, upload_id) = self.write(None, None, false, None).await?;

let Some(upload_id) = upload_id else {
let Some(upload_id) = self.write(None, None).await? else {
return Err(Error::new(ErrorKind::Unsupported, "cannot find upload_id"));
};

Expand Down Expand Up @@ -250,7 +183,7 @@ impl oio::MultipartWrite for AliyunDriveWriter {
if part_info_list.is_empty() {
return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
}
self.write(None, Some(body), false, Some(&part_info_list[0].upload_url))
self.write(Some(body), Some(&part_info_list[0].upload_url))
.await?;

Ok(oio::MultipartPart {
Expand All @@ -261,15 +194,12 @@ impl oio::MultipartWrite for AliyunDriveWriter {
}

async fn complete_part(&self, upload_id: &str, _parts: &[oio::MultipartPart]) -> Result<()> {
let file_id = self.read_file_id().await?;
self.core.complete(&file_id, upload_id).await?;
self.complete(upload_id).await?;

Ok(())
}

async fn abort_part(&self, _upload_id: &str) -> Result<()> {
let file_id = self.read_file_id().await?;

self.core.delete_path(&file_id).await
self.delete().await
}
}
Loading