diff --git a/Cargo.lock b/Cargo.lock index 50ebc5e3e88..2a58beef671 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -891,33 +891,13 @@ dependencies = [ "subtle", ] -[[package]] -name = "dirs" -version = "4.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" -dependencies = [ - "dirs-sys 0.3.7", -] - [[package]] name = "dirs" version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dece029acd3353e3a58ac2e3eb3c8d6c35827a892edc6cc4138ef9c33df46ecd" dependencies = [ - "dirs-sys 0.4.0", -] - -[[package]] -name = "dirs-sys" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" -dependencies = [ - "libc", - "redox_users", - "winapi", + "dirs-sys", ] [[package]] @@ -2187,7 +2167,7 @@ dependencies = [ "anyhow", "assert_cmd", "clap 4.1.11", - "dirs 5.0.0", + "dirs", "env_logger", "futures", "log", @@ -2251,8 +2231,7 @@ dependencies = [ "quick-xml 0.27.1", "rand 0.8.5", "redis", - "reqsign 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)", - "reqsign 0.8.5 (git+https://github.com/Xuanwo/reqsign?rev=fde88af3aecf4ba6c39e5d84dc39c5200f8f3a5e)", + "reqsign", "reqwest", "rocksdb", "serde", @@ -3131,46 +3110,15 @@ checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" [[package]] name = "reqsign" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7db6d8d2cd7fa61403d14de670f98d7cedac38143681c124943d7bb69258b3a" -dependencies = [ - "anyhow", - "backon", - "base64 0.21.0", - "bytes", - "dirs 4.0.0", - "form_urlencoded", - "hex", - "hmac", - "http", - "jsonwebtoken", - "log", - "once_cell", - "percent-encoding", - "quick-xml 0.28.1", - "rand 0.8.5", - "rsa", - "rust-ini", - "serde", - "serde_json", - "sha1", - "sha2", - "time 0.3.17", - "ureq", -] - -[[package]] -name = "reqsign" -version = "0.8.5" -source = "git+https://github.com/Xuanwo/reqsign?rev=fde88af3aecf4ba6c39e5d84dc39c5200f8f3a5e#fde88af3aecf4ba6c39e5d84dc39c5200f8f3a5e" +version = "0.9.0" +source = "git+https://github.com/Xuanwo/reqsign?rev=3707b084b534233a11cd34729dba8ec4f4c9e1fc#3707b084b534233a11cd34729dba8ec4f4c9e1fc" dependencies = [ "anyhow", "async-trait", "base64 0.21.0", "bytes", "chrono", - "dirs 5.0.0", + "dirs", "form_urlencoded", "hex", "hmac", diff --git a/core/Cargo.toml b/core/Cargo.toml index c3c167c51ec..b9099f4cefd 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -119,8 +119,7 @@ redis = { version = "0.22", features = [ "connection-manager", ], optional = true } # NOTE: we keep this for service migration one by one. And finally we will replace reqsign by v0.9. -reqsign = "0.8.5" -reqsign_0_9 = { package = "reqsign", git = "https://github.com/Xuanwo/reqsign", rev = "fde88af3aecf4ba6c39e5d84dc39c5200f8f3a5e" } +reqsign_0_9 = { package = "reqsign", git = "https://github.com/Xuanwo/reqsign", rev = "3707b084b534233a11cd34729dba8ec4f4c9e1fc" } reqwest = { version = "0.11.13", features = [ "multipart", "stream", diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index 840956fd70a..ea07aa6c3d8 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -20,15 +20,14 @@ use std::fmt::Debug; use std::sync::Arc; use async_trait::async_trait; -use http::header::CONTENT_LENGTH; -use http::header::CONTENT_TYPE; -use http::Request; -use http::Response; use http::StatusCode; use http::Uri; use log::debug; -use reqsign::HuaweicloudObsSigner; +use reqsign_0_9::HuaweicloudObsConfig; +use reqsign_0_9::HuaweicloudObsCredentialLoader; +use reqsign_0_9::HuaweicloudObsSigner; +use super::core::ObsCore; use super::error::parse_error; use super::pager::ObsPager; use super::writer::ObsWriter; @@ -249,14 +248,13 @@ impl Builder for ObsBuilder { })? }; - let mut signer_builder = HuaweicloudObsSigner::builder(); - if let (Some(access_key_id), Some(secret_access_key)) = - (&self.access_key_id, &self.secret_access_key) - { - signer_builder - .access_key(access_key_id) - .secret_key(secret_access_key); - } + let config = HuaweicloudObsConfig { + access_key_id: self.access_key_id.take(), + secret_access_key: self.secret_access_key.take(), + security_token: None, + }; + + let cred_loader = HuaweicloudObsCredentialLoader::new(config); // Set the bucket name in CanonicalizedResource. // 1. If the bucket is bound to a user domain name, use the user domain name as the bucket name, @@ -265,25 +263,24 @@ impl Builder for ObsBuilder { // // Please refer to this doc for more details: // https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0010.html - if is_obs_default { - signer_builder.bucket(&bucket); - } else { - signer_builder.bucket(&endpoint); - } - - let signer = signer_builder.build().map_err(|e| { - Error::new(ErrorKind::Unexpected, "build HuaweicloudObsSigner") - .with_context("service", Scheme::Obs) - .set_source(e) - })?; + let signer = HuaweicloudObsSigner::new({ + if is_obs_default { + &bucket + } else { + &endpoint + } + }); - debug!("backend build finished: {:?}", &self); + debug!("backend build finished"); Ok(ObsBackend { - client, - root, - endpoint: format!("{}://{}", &scheme, &endpoint), - signer: Arc::new(signer), - bucket, + core: Arc::new(ObsCore { + bucket, + root, + endpoint: format!("{}://{}", &scheme, &endpoint), + signer, + loader: cred_loader, + client, + }), }) } } @@ -291,11 +288,7 @@ impl Builder for ObsBuilder { /// Backend for Huaweicloud OBS services. #[derive(Debug, Clone)] pub struct ObsBackend { - pub client: HttpClient, - root: String, - endpoint: String, - pub signer: Arc, - bucket: String, + core: Arc, } #[async_trait] @@ -313,8 +306,8 @@ impl Accessor for ObsBackend { let mut am = AccessorInfo::default(); am.set_scheme(Scheme::Obs) - .set_root(&self.root) - .set_name(&self.bucket) + .set_root(&self.core.root) + .set_name(&self.core.bucket) .set_capabilities(Read | Write | Copy | List | Scan) .set_hints(ReadStreamable); @@ -322,11 +315,13 @@ impl Accessor for ObsBackend { } async fn create(&self, path: &str, _: OpCreate) -> Result { - let mut req = self.obs_put_object_request(path, Some(0), None, AsyncBody::Empty)?; + let mut req = self + .core + .obs_put_object_request(path, Some(0), None, AsyncBody::Empty)?; - self.signer.sign(&mut req).map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.client.send(req).await?; + let resp = self.core.send(req).await?; let status = resp.status(); @@ -340,7 +335,7 @@ impl Accessor for ObsBackend { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.obs_get_object(path, args.range()).await?; + let resp = self.core.obs_get_object(path, args.range()).await?; let status = resp.status(); @@ -363,12 +358,12 @@ impl Accessor for ObsBackend { Ok(( RpWrite::default(), - ObsWriter::new(self.clone(), args, path.to_string()), + ObsWriter::new(self.core.clone(), args, path.to_string()), )) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { - let resp = self.obs_copy_object(from, to).await?; + let resp = self.core.obs_copy_object(from, to).await?; let status = resp.status(); @@ -387,7 +382,7 @@ impl Accessor for ObsBackend { return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); } - let resp = self.obs_get_head_object(path).await?; + let resp = self.core.obs_get_head_object(path).await?; let status = resp.status(); @@ -402,7 +397,7 @@ impl Accessor for ObsBackend { } async fn delete(&self, path: &str, _: OpDelete) -> Result { - let resp = self.obs_delete_object(path).await?; + let resp = self.core.obs_delete_object(path).await?; let status = resp.status(); @@ -417,156 +412,14 @@ impl Accessor for ObsBackend { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { Ok(( RpList::default(), - ObsPager::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), + ObsPager::new(self.core.clone(), path, "/", args.limit()), )) } async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { Ok(( RpScan::default(), - ObsPager::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), + ObsPager::new(self.core.clone(), path, "", args.limit()), )) } } - -impl ObsBackend { - async fn obs_get_object( - &self, - path: &str, - range: BytesRange, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::get(&url); - - if !range.is_full() { - req = req.header(http::header::RANGE, range.to_header()) - } - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - pub fn obs_put_object_request( - &self, - path: &str, - size: Option, - content_type: Option<&str>, - body: AsyncBody, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::put(&url); - - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size) - } - - if let Some(mime) = content_type { - req = req.header(CONTENT_TYPE, mime) - } - - let req = req.body(body).map_err(new_request_build_error)?; - - Ok(req) - } - - async fn obs_get_head_object(&self, path: &str) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - // The header 'Origin' is optional for API calling, the doc has mistake, confirmed with customer service of huaweicloud. - // https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0084.html - - let req = Request::head(&url); - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - async fn obs_delete_object(&self, path: &str) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - let req = Request::delete(&url); - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - async fn obs_copy_object(&self, from: &str, to: &str) -> Result> { - let source = build_abs_path(&self.root, from); - let target = build_abs_path(&self.root, to); - - let source = format!("/{}/{}", self.bucket, percent_encode_path(&source)); - let url = format!("{}/{}", self.endpoint, percent_encode_path(&target)); - - let mut req = Request::put(&url) - .header("x-obs-copy-source", percent_encode_path(&source)) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - pub(crate) async fn obs_list_objects( - &self, - path: &str, - next_marker: &str, - delimiter: &str, - limit: Option, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let mut queries = vec![]; - if !path.is_empty() { - queries.push(format!("prefix={}", percent_encode_path(&p))); - } - if !delimiter.is_empty() { - queries.push(format!("delimiter={delimiter}")); - } - if let Some(limit) = limit { - queries.push(format!("max-keys={limit}")); - } - if !next_marker.is_empty() { - queries.push(format!("marker={next_marker}")); - } - - let url = if queries.is_empty() { - self.endpoint.to_string() - } else { - format!("{}?{}", self.endpoint, queries.join("&")) - }; - - let mut req = Request::get(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } -} diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs new file mode 100644 index 00000000000..496440620ad --- /dev/null +++ b/core/src/services/obs/core.rs @@ -0,0 +1,226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; + +use crate::raw::*; +use crate::*; +use http::header::CONTENT_LENGTH; +use http::header::CONTENT_TYPE; +use http::Request; +use http::Response; +use reqsign_0_9::HuaweicloudObsCredential; +use reqsign_0_9::HuaweicloudObsCredentialLoader; +use reqsign_0_9::HuaweicloudObsSigner; + +pub struct ObsCore { + pub bucket: String, + pub root: String, + pub endpoint: String, + + pub signer: HuaweicloudObsSigner, + pub loader: HuaweicloudObsCredentialLoader, + pub client: HttpClient, +} + +impl Debug for ObsCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Backend") + .field("root", &self.root) + .field("bucket", &self.bucket) + .field("endpoint", &self.endpoint) + .finish_non_exhaustive() + } +} + +impl ObsCore { + async fn load_credential(&self) -> Result> { + let cred = self + .loader + .load() + .await + .map_err(new_request_credential_error)?; + + if let Some(cred) = cred { + Ok(Some(cred)) + } else { + Ok(None) + } + } + + pub async fn sign(&self, req: &mut Request) -> Result<()> { + let cred = if let Some(cred) = self.load_credential().await? { + cred + } else { + return Ok(()); + }; + + self.signer.sign(req, &cred).map_err(new_request_sign_error) + } + + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } +} + +impl ObsCore { + pub async fn obs_get_object( + &self, + path: &str, + range: BytesRange, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + let mut req = Request::get(&url); + + if !range.is_full() { + req = req.header(http::header::RANGE, range.to_header()) + } + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub fn obs_put_object_request( + &self, + path: &str, + size: Option, + content_type: Option<&str>, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + let mut req = Request::put(&url); + + if let Some(size) = size { + req = req.header(CONTENT_LENGTH, size) + } + + if let Some(mime) = content_type { + req = req.header(CONTENT_TYPE, mime) + } + + let req = req.body(body).map_err(new_request_build_error)?; + + Ok(req) + } + + pub async fn obs_get_head_object(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + // The header 'Origin' is optional for API calling, the doc has mistake, confirmed with customer service of huaweicloud. + // https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0084.html + + let req = Request::head(&url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn obs_delete_object(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + let req = Request::delete(&url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn obs_copy_object( + &self, + from: &str, + to: &str, + ) -> Result> { + let source = build_abs_path(&self.root, from); + let target = build_abs_path(&self.root, to); + + let source = format!("/{}/{}", self.bucket, percent_encode_path(&source)); + let url = format!("{}/{}", self.endpoint, percent_encode_path(&target)); + + let mut req = Request::put(&url) + .header("x-obs-copy-source", percent_encode_path(&source)) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn obs_list_objects( + &self, + path: &str, + next_marker: &str, + delimiter: &str, + limit: Option, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let mut queries = vec![]; + if !path.is_empty() { + queries.push(format!("prefix={}", percent_encode_path(&p))); + } + if !delimiter.is_empty() { + queries.push(format!("delimiter={delimiter}")); + } + if let Some(limit) = limit { + queries.push(format!("max-keys={limit}")); + } + if !next_marker.is_empty() { + queries.push(format!("marker={next_marker}")); + } + + let url = if queries.is_empty() { + self.endpoint.to_string() + } else { + format!("{}?{}", self.endpoint, queries.join("&")) + }; + + let mut req = Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } +} diff --git a/core/src/services/obs/mod.rs b/core/src/services/obs/mod.rs index c7cd5b5c13b..9e44b65ef0c 100644 --- a/core/src/services/obs/mod.rs +++ b/core/src/services/obs/mod.rs @@ -18,6 +18,7 @@ mod backend; pub use backend::ObsBuilder as Obs; +mod core; mod error; mod pager; mod writer; diff --git a/core/src/services/obs/pager.rs b/core/src/services/obs/pager.rs index e373c5b2a5e..e24f4f6a63e 100644 --- a/core/src/services/obs/pager.rs +++ b/core/src/services/obs/pager.rs @@ -22,7 +22,7 @@ use bytes::Buf; use quick_xml::de; use serde::Deserialize; -use super::backend::ObsBackend; +use super::core::ObsCore; use super::error::parse_error; use crate::raw::*; use crate::EntryMode; @@ -32,8 +32,7 @@ use crate::Metadata; use crate::Result; pub struct ObsPager { - backend: Arc, - root: String, + core: Arc, path: String, delimiter: String, limit: Option, @@ -43,16 +42,9 @@ pub struct ObsPager { } impl ObsPager { - pub fn new( - backend: Arc, - root: &str, - path: &str, - delimiter: &str, - limit: Option, - ) -> Self { + pub fn new(core: Arc, path: &str, delimiter: &str, limit: Option) -> Self { Self { - backend, - root: root.to_string(), + core, path: path.to_string(), delimiter: delimiter.to_string(), limit, @@ -71,7 +63,7 @@ impl oio::Page for ObsPager { } let resp = self - .backend + .core .obs_list_objects(&self.path, &self.next_marker, &self.delimiter, self.limit) .await?; @@ -98,7 +90,7 @@ impl oio::Page for ObsPager { for prefix in common_prefixes { let de = oio::Entry::new( - &build_rel_path(&self.root, &prefix.prefix), + &build_rel_path(&self.core.root, &prefix.prefix), Metadata::new(EntryMode::DIR), ); @@ -112,7 +104,7 @@ impl oio::Page for ObsPager { let meta = Metadata::new(EntryMode::FILE).with_content_length(object.size); - let de = oio::Entry::new(&build_rel_path(&self.root, &object.key), meta); + let de = oio::Entry::new(&build_rel_path(&self.core.root, &object.key), meta); entries.push(de); } diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index df7db515698..9e410a35bb7 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -15,45 +15,44 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use async_trait::async_trait; use bytes::Bytes; use http::StatusCode; -use super::backend::ObsBackend; +use super::core::ObsCore; use super::error::parse_error; use crate::ops::OpWrite; use crate::raw::*; use crate::*; pub struct ObsWriter { - backend: ObsBackend, + core: Arc, op: OpWrite, path: String, } impl ObsWriter { - pub fn new(backend: ObsBackend, op: OpWrite, path: String) -> Self { - ObsWriter { backend, op, path } + pub fn new(core: Arc, op: OpWrite, path: String) -> Self { + ObsWriter { core, op, path } } } #[async_trait] impl oio::Write for ObsWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - let mut req = self.backend.obs_put_object_request( + let mut req = self.core.obs_put_object_request( &self.path, Some(bs.len()), self.op.content_type(), AsyncBody::Bytes(bs), )?; - self.backend - .signer - .sign(&mut req) - .map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.backend.client.send(req).await?; + let resp = self.core.send(req).await?; let status = resp.status();