diff --git a/core/Cargo.toml b/core/Cargo.toml index 73c7fea9268..d49b9de04e0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -193,7 +193,7 @@ services-vercel-blob = [] services-webdav = [] services-webhdfs = [] services-yandex-disk = [] -services-aliyun-drive = ["dep:sha1"] +services-aliyun-drive = [] [lib] bench = false diff --git a/core/src/services/aliyun_drive/backend.rs b/core/src/services/aliyun_drive/backend.rs index 766c0599556..9974757bca9 100644 --- a/core/src/services/aliyun_drive/backend.rs +++ b/core/src/services/aliyun_drive/backend.rs @@ -75,12 +75,6 @@ pub struct AliyunDriveConfig { /// /// Fallback to default if not set or no other drives can be found. pub drive_type: String, - /// rapid_upload of this backend. - /// - /// Skip uploading files that are already in the drive by hashing their content. - /// - /// Only works under the write_once operation. - pub rapid_upload: bool, } impl Debug for AliyunDriveConfig { @@ -160,13 +154,6 @@ impl AliyunDriveBuilder { self } - /// Set rapid_upload of this backend. - pub fn rapid_upload(&mut self, rapid_upload: bool) -> &mut Self { - self.config.rapid_upload = rapid_upload; - - self - } - /// Specify the http client that used by this service. /// /// # Notes @@ -243,15 +230,11 @@ impl Builder for AliyunDriveBuilder { }; debug!("backend use drive_type {:?}", drive_type); - let rapid_upload = self.config.rapid_upload; - debug!("backend use rapid_upload {}", rapid_upload); - Ok(AliyunDriveBackend { core: Arc::new(AliyunDriveCore { endpoint: "https://openapi.alipan.com".to_string(), root, drive_type, - rapid_upload, signer: Arc::new(Mutex::new(AliyunDriveSigner { drive_id: None, sign, diff --git a/core/src/services/aliyun_drive/core.rs b/core/src/services/aliyun_drive/core.rs index 51f30665c7f..00ac06b9269 100644 --- a/core/src/services/aliyun_drive/core.rs +++ b/core/src/services/aliyun_drive/core.rs @@ -63,7 +63,6 @@ pub struct AliyunDriveCore { pub endpoint: String, pub root: String, pub drive_type: DriveType, - pub rapid_upload: bool, pub signer: Arc>, pub client: HttpClient, @@ -475,8 +474,8 @@ pub struct UploadUrlResponse { pub struct CreateResponse { pub file_id: String, pub upload_id: Option, - pub rapid_upload: Option, pub part_info_list: Option>, + pub exist: Option, } #[derive(Serialize, Deserialize)] diff --git a/core/src/services/aliyun_drive/docs.md b/core/src/services/aliyun_drive/docs.md index b356ebe3f30..e963fd8a111 100644 --- a/core/src/services/aliyun_drive/docs.md +++ b/core/src/services/aliyun_drive/docs.md @@ -21,7 +21,6 @@ This service can be used to: - `client_secret`: Set the client_secret for backend. - `refresh_token`: Set the refresh_token for backend. - `drive_type`: Set the drive_type for backend. -- `rapid_upload`: Set the rapid_upload for backend. Refer to [`AliyunDriveBuilder`]`s public API docs for more information. @@ -54,10 +53,6 @@ async fn main() -> Result<()> { // // Fallback to the default type if no other types found. builder.drive_type("resource"); - // Set the rapid_upload. - // - // Works only under the write_once operation for now. - builder.rapid_upload(true); let op: Operator = Operator::new(builder)?.finish(); diff --git a/core/src/services/aliyun_drive/error.rs b/core/src/services/aliyun_drive/error.rs index fa343e4f6fa..ec2ad85e08f 100644 --- a/core/src/services/aliyun_drive/error.rs +++ b/core/src/services/aliyun_drive/error.rs @@ -41,6 +41,7 @@ pub async fn parse_error(res: Response) -> Result { Some(code) if code == "PreHashMatched" => (ErrorKind::IsSameFile, false), _ => (ErrorKind::Unexpected, false), }, + 409 => (ErrorKind::AlreadyExists, false), 429 => match code { Some(code) if code == "TooManyRequests" => (ErrorKind::RateLimited, true), _ => (ErrorKind::Unexpected, false), diff --git a/core/src/services/aliyun_drive/writer.rs b/core/src/services/aliyun_drive/writer.rs index 174c52a12c7..c1a3efa947e 100644 --- a/core/src/services/aliyun_drive/writer.rs +++ b/core/src/services/aliyun_drive/writer.rs @@ -15,25 +15,16 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - -use base64::engine::general_purpose; -use base64::Engine; +use super::core::{AliyunDriveCore, UploadUrlResponse}; +use crate::{ + raw::*, + services::aliyun_drive::core::{CheckNameMode, CreateResponse, CreateType}, + *, +}; use bytes::Buf; -use md5::Digest; -use md5::Md5; -use sha1::Sha1; +use std::sync::Arc; use tokio::sync::RwLock; -use super::core::AliyunDriveCore; -use super::core::RapidUpload; -use super::core::UploadUrlResponse; -use crate::raw::*; -use crate::services::aliyun_drive::core::CheckNameMode; -use crate::services::aliyun_drive::core::CreateResponse; -use crate::services::aliyun_drive::core::CreateType; -use crate::*; - pub type AliyunDriveWriters = oio::MultipartWriter; pub struct AliyunDriveWriter { @@ -72,65 +63,11 @@ impl AliyunDriveWriter { Ok(file_id.clone()) } - async fn get_rapid_upload( - &self, - size: Option, - body: Option, - pre_hash: bool, - ) -> Result> { - let Some(size) = size else { - return Ok(None); - }; - let Some(body) = body else { - return Ok(None); - }; - if pre_hash && size > 1024 * 100 { - return Ok(Some(RapidUpload { - pre_hash: Some(format!( - "{:x}", - Sha1::new_with_prefix(body.slice(0..1024).to_vec()).finalize() - )), - content_hash: None, - proof_code: None, - })); - } - let (token, _) = self.core.get_token_and_drive().await?; - let Ok(index) = u64::from_str_radix( - &format!("{:x}", Md5::new_with_prefix(token.unwrap()).finalize())[0..16], - 16, - ) else { - return Err(Error::new( - ErrorKind::Unexpected, - "cannot parse hexadecimal", - )); - }; - let size = size as usize; - let index = index as usize % size; - let (range_start, range_end) = if index + 8 > size { - (index, size) - } else { - (index, index + 8) - }; - - Ok(Some(RapidUpload { - pre_hash: None, - content_hash: Some(format!( - "{:x}", - Sha1::new_with_prefix(body.to_vec()).finalize() - )), - proof_code: Some( - general_purpose::STANDARD.encode(body.to_bytes().slice(range_start..range_end)), - ), - })) - } - async fn write( &self, - size: Option, - body: Option, - pre_hash: bool, + body: Option, upload_url: Option<&str>, - ) -> Result<(bool, Option)> { + ) -> Result> { if let Some(upload_url) = upload_url { let Some(body) = body else { return Err(Error::new( @@ -138,25 +75,27 @@ impl AliyunDriveWriter { "cannot upload without body", )); }; - self.core.upload(upload_url, body).await?; - return Ok((false, None)); + if let Err(err) = self.core.upload(upload_url, body).await { + if err.kind() != ErrorKind::AlreadyExists { + return Err(err); + } + }; + return Ok(None); } let res = self .core - .create_with_rapid_upload( + .create( Some(&self.parent_file_id), &self.name, CreateType::File, CheckNameMode::Refuse, - size, - self.get_rapid_upload(size, body.clone(), pre_hash).await?, ) .await; let res = match res { Err(err) if err.kind() == ErrorKind::IsSameFile => { - return Ok((true, None)); + return Ok(None); } Err(err) => { return Err(err); @@ -167,8 +106,11 @@ impl AliyunDriveWriter { let output: CreateResponse = serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?; self.write_file_id(output.file_id).await; + if output.exist.is_some_and(|x| x) { + return Err(Error::new(ErrorKind::AlreadyExists, "file exists")); + } - if output.upload_id.is_some() && output.rapid_upload.is_some_and(|x| !x) { + if output.upload_id.is_some() { if let Some(body) = body { let Some(part_info_list) = output.part_info_list else { return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url")); @@ -176,50 +118,41 @@ impl AliyunDriveWriter { if part_info_list.is_empty() { return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url")); } - self.core - .upload(&part_info_list[0].upload_url, body) - .await?; + if let Err(err) = self.core.upload(&part_info_list[0].upload_url, body).await { + if err.kind() != ErrorKind::AlreadyExists { + return Err(err); + } + } } } - Ok((false, output.upload_id)) + Ok(output.upload_id) } -} - -impl oio::MultipartWrite for AliyunDriveWriter { - async fn write_once(&self, size: u64, body: crate::Buffer) -> Result<()> { - let upload_id = if self.core.rapid_upload { - let (rapid, mut upload_id) = self - .write(Some(size), Some(body.clone()), true, None) - .await?; - let size = if rapid { Some(size) } else { None }; - let (_, new_upload_id) = self.write(size, Some(body), false, None).await?; + async fn complete(&self, upload_id: &str) -> Result { + let file_id = self.read_file_id().await?; - if new_upload_id.is_some() { - upload_id = new_upload_id; - } + self.core.complete(&file_id, upload_id).await + } - let Some(upload_id) = upload_id else { - return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_id")); - }; - upload_id - } else { - let upload_id = self.initiate_part().await?; - self.write_part(&upload_id, 0, size, body).await?; - upload_id - }; + async fn delete(&self) -> Result<()> { let file_id = self.read_file_id().await?; - self.core.complete(&file_id, &upload_id).await?; + self.core.delete_path(&file_id).await + } +} + +impl oio::MultipartWrite for AliyunDriveWriter { + async fn write_once(&self, size: u64, body: crate::Buffer) -> Result<()> { + let upload_id = self.initiate_part().await?; + self.write_part(&upload_id, 0, size, body).await?; + self.complete(&upload_id).await?; Ok(()) } async fn initiate_part(&self) -> Result { - let (_, upload_id) = self.write(None, None, false, None).await?; - - let Some(upload_id) = upload_id else { + let Some(upload_id) = self.write(None, None).await? else { return Err(Error::new(ErrorKind::Unsupported, "cannot find upload_id")); }; @@ -250,7 +183,7 @@ impl oio::MultipartWrite for AliyunDriveWriter { if part_info_list.is_empty() { return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url")); } - self.write(None, Some(body), false, Some(&part_info_list[0].upload_url)) + self.write(Some(body), Some(&part_info_list[0].upload_url)) .await?; Ok(oio::MultipartPart { @@ -261,15 +194,12 @@ impl oio::MultipartWrite for AliyunDriveWriter { } async fn complete_part(&self, upload_id: &str, _parts: &[oio::MultipartPart]) -> Result<()> { - let file_id = self.read_file_id().await?; - self.core.complete(&file_id, upload_id).await?; + self.complete(upload_id).await?; Ok(()) } async fn abort_part(&self, _upload_id: &str) -> Result<()> { - let file_id = self.read_file_id().await?; - - self.core.delete_path(&file_id).await + self.delete().await } }