From ada9062b0dc884a6d9eac78ea88ffdcb76e538e4 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 11 Apr 2023 13:56:15 +0800 Subject: [PATCH] refactor(services/azdfs): Migrate to async reqsign (#1903) Signed-off-by: Xuanwo --- core/src/services/azdfs/backend.rs | 300 +++++------------------------ core/src/services/azdfs/core.rs | 270 ++++++++++++++++++++++++++ core/src/services/azdfs/mod.rs | 1 + core/src/services/azdfs/pager.rs | 20 +- core/src/services/azdfs/writer.rs | 28 ++- 5 files changed, 334 insertions(+), 285 deletions(-) create mode 100644 core/src/services/azdfs/core.rs diff --git a/core/src/services/azdfs/backend.rs b/core/src/services/azdfs/backend.rs index 5a810ac207c..62c0ef64ad4 100644 --- a/core/src/services/azdfs/backend.rs +++ b/core/src/services/azdfs/backend.rs @@ -18,19 +18,16 @@ use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; -use std::fmt::Write; use std::sync::Arc; use async_trait::async_trait; -use http::header::CONTENT_DISPOSITION; -use http::header::CONTENT_LENGTH; -use http::header::CONTENT_TYPE; -use http::Request; -use http::Response; use http::StatusCode; use log::debug; -use reqsign::AzureStorageSigner; +use reqsign_0_9::AzureStorageConfig; +use reqsign_0_9::AzureStorageLoader; +use reqsign_0_9::AzureStorageSigner; +use super::core::AzdfsCore; use super::error::parse_error; use super::pager::AzdfsPager; use super::writer::AzdfsWriter; @@ -252,35 +249,28 @@ impl Builder for AzdfsBuilder { })? }; - let mut signer_builder = AzureStorageSigner::builder(); - let mut account_name = None; - if let (Some(name), Some(key)) = (&self.account_name, &self.account_key) { - account_name = Some(name.clone()); - signer_builder.account_name(name).account_key(key); - } else if let Some(key) = &self.account_key { - account_name = infer_storage_name_from_endpoint(endpoint.as_str()); - signer_builder - .account_name(account_name.as_ref().unwrap_or(&String::new())) - .account_key(key); - } + let config_loader = AzureStorageConfig { + account_name: self + .account_name + .clone() + .or_else(|| infer_storage_name_from_endpoint(endpoint.as_str())), + account_key: self.account_key.clone(), + sas_token: None, + }; - let signer = signer_builder.build().map_err(|e| { - Error::new(ErrorKind::ConfigInvalid, "build AzureStorageSigner") - .with_operation("Builder::build") - .with_context("service", Scheme::Azdfs) - .with_context("endpoint", &endpoint) - .with_context("container", filesystem.as_str()) - .set_source(e) - })?; + let cred_loader = AzureStorageLoader::new(config_loader); + let signer = AzureStorageSigner::new(); debug!("backend build finished: {:?}", &self); Ok(AzdfsBackend { - root, - endpoint, - signer: Arc::new(signer), - filesystem: self.filesystem.clone(), - client, - _account_name: account_name.unwrap_or_default(), + core: Arc::new(AzdfsCore { + filesystem: self.filesystem.clone(), + root, + endpoint, + client, + loader: cred_loader, + signer, + }), }) } @@ -300,13 +290,7 @@ impl Builder for AzdfsBuilder { /// Backend for azblob services. #[derive(Debug, Clone)] pub struct AzdfsBackend { - filesystem: String, - // TODO: remove pub after https://github.com/apache/incubator-opendal/issues/1427 - pub client: HttpClient, - root: String, // root will be "/" or /abc/ - endpoint: String, - pub signer: Arc, - _account_name: String, + core: Arc, } #[async_trait] @@ -321,8 +305,8 @@ impl Accessor for AzdfsBackend { fn info(&self) -> AccessorInfo { let mut am = AccessorInfo::default(); am.set_scheme(Scheme::Azdfs) - .set_root(&self.root) - .set_name(&self.filesystem) + .set_root(&self.core.root) + .set_name(&self.core.filesystem) .set_capabilities( AccessorCapability::Read | AccessorCapability::Write | AccessorCapability::List, ) @@ -338,11 +322,13 @@ impl Accessor for AzdfsBackend { _ => unimplemented!("not supported object mode"), }; - let mut req = self.azdfs_create_request(path, resource, None, None, AsyncBody::Empty)?; + let mut req = + self.core + .azdfs_create_request(path, resource, None, None, AsyncBody::Empty)?; - self.signer.sign(&mut req).map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.client.send(req).await?; + let resp = self.core.send(req).await?; let status = resp.status(); @@ -356,7 +342,7 @@ impl Accessor for AzdfsBackend { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.azdfs_read(path, args.range()).await?; + let resp = self.core.azdfs_read(path, args.range()).await?; let status = resp.status(); @@ -379,7 +365,7 @@ impl Accessor for AzdfsBackend { Ok(( RpWrite::default(), - AzdfsWriter::new(self.clone(), args, path.to_string()), + AzdfsWriter::new(self.core.clone(), args, path.to_string()), )) } @@ -389,7 +375,7 @@ impl Accessor for AzdfsBackend { return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); } - let resp = self.azdfs_get_properties(path).await?; + let resp = self.core.azdfs_get_properties(path).await?; let status = resp.status(); @@ -403,7 +389,7 @@ impl Accessor for AzdfsBackend { } async fn delete(&self, path: &str, _: OpDelete) -> Result { - let resp = self.azdfs_delete(path).await?; + let resp = self.core.azdfs_delete(path).await?; let status = resp.status(); @@ -414,216 +400,19 @@ impl Accessor for AzdfsBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - let op = AzdfsPager::new( - Arc::new(self.clone()), - self.root.clone(), - path.to_string(), - args.limit(), - ); + let op = AzdfsPager::new(self.core.clone(), path.to_string(), args.limit()); Ok((RpList::default(), op)) } } -impl AzdfsBackend { - async fn azdfs_read( - &self, - path: &str, - range: BytesRange, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/{}/{}", - self.endpoint, - self.filesystem, - percent_encode_path(&p) - ); - - let mut req = Request::get(&url); - - if !range.is_full() { - // azblob doesn't support read with suffix range. - // - // ref: https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations - if range.offset().is_none() && range.size().is_some() { - return Err(Error::new( - ErrorKind::Unsupported, - "azblob doesn't support read with suffix range", - )); - } - - req = req.header(http::header::RANGE, range.to_header()); - } - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - /// resource should be one of `file` or `directory` - /// - /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create - pub fn azdfs_create_request( - &self, - path: &str, - resource: &str, - content_type: Option<&str>, - content_disposition: Option<&str>, - body: AsyncBody, - ) -> Result> { - let p = build_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let url = format!( - "{}/{}/{}?resource={resource}", - self.endpoint, - self.filesystem, - percent_encode_path(&p) - ); - - let mut req = Request::put(&url); - - // Content length must be 0 for create request. - req = req.header(CONTENT_LENGTH, 0); - - if let Some(ty) = content_type { - req = req.header(CONTENT_TYPE, ty) - } - - if let Some(pos) = content_disposition { - req = req.header(CONTENT_DISPOSITION, pos) - } - - // Set body - let req = req.body(body).map_err(new_request_build_error)?; - - Ok(req) - } - - /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update - pub fn azdfs_update_request( - &self, - path: &str, - size: Option, - body: AsyncBody, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - // - close: Make this is the final action to this file. - // - flush: Flush the file directly. - let url = format!( - "{}/{}/{}?action=append&close=true&flush=true&position=0", - self.endpoint, - self.filesystem, - percent_encode_path(&p) - ); - - let mut req = Request::patch(&url); - - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size) - } - - // Set body - let req = req.body(body).map_err(new_request_build_error)?; - - Ok(req) - } - - async fn azdfs_get_properties(&self, path: &str) -> Result> { - let p = build_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let url = format!( - "{}/{}/{}?action=getStatus", - self.endpoint, - self.filesystem, - percent_encode_path(&p) - ); - - let req = Request::head(&url); - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - async fn azdfs_delete(&self, path: &str) -> Result> { - let p = build_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let url = format!( - "{}/{}/{}", - self.endpoint, - self.filesystem, - percent_encode_path(&p) - ); - - let req = Request::delete(&url); - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - pub(crate) async fn azdfs_list( - &self, - path: &str, - continuation: &str, - limit: Option, - ) -> Result> { - let p = build_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let mut url = format!( - "{}/{}?resource=filesystem&recursive=false", - self.endpoint, self.filesystem - ); - if !p.is_empty() { - write!(url, "&directory={}", percent_encode_path(&p)) - .expect("write into string must succeed"); - } - if let Some(limit) = limit { - write!(url, "&maxresults={limit}").expect("write into string must succeed"); - } - if !continuation.is_empty() { - write!(url, "&continuation={continuation}").expect("write into string must succeed"); - } - - let mut req = Request::get(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } -} - fn infer_storage_name_from_endpoint(endpoint: &str) -> Option { - let _endpoint: &str = endpoint + let endpoint: &str = endpoint .strip_prefix("http://") .or_else(|| endpoint.strip_prefix("https://")) .unwrap_or(endpoint); - let mut parts = _endpoint.splitn(2, '.'); + let mut parts = endpoint.splitn(2, '.'); let storage_name = parts.next(); let endpoint_suffix = parts .next() @@ -672,13 +461,11 @@ mod tests { .expect("build azdfs should be succeeded."); assert_eq!( - azdfs.endpoint, + azdfs.core.endpoint, "https://storagesample.dfs.core.chinacloudapi.cn" ); - assert_eq!(azdfs._account_name, "storagesample".to_string()); - - assert_eq!(azdfs.filesystem, "filesystem".to_string()); + assert_eq!(azdfs.core.filesystem, "filesystem".to_string()); assert_eq!( azdfs_builder.account_key.unwrap(), @@ -695,11 +482,12 @@ mod tests { .build() .expect("build azdfs should be succeeded."); - assert_eq!(azdfs.endpoint, "https://storagesample.dfs.core.windows.net"); - - assert_eq!(azdfs._account_name, "".to_string()); + assert_eq!( + azdfs.core.endpoint, + "https://storagesample.dfs.core.windows.net" + ); - assert_eq!(azdfs.filesystem, "filesystem".to_string()); + assert_eq!(azdfs.core.filesystem, "filesystem".to_string()); assert_eq!(azdfs_builder.account_key, None); } diff --git a/core/src/services/azdfs/core.rs b/core/src/services/azdfs/core.rs new file mode 100644 index 00000000000..2b90511249e --- /dev/null +++ b/core/src/services/azdfs/core.rs @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::fmt::Write; + +use http::header::CONTENT_DISPOSITION; +use http::header::CONTENT_LENGTH; +use http::header::CONTENT_TYPE; +use http::Request; +use http::Response; +use reqsign_0_9::AzureStorageCredential; +use reqsign_0_9::AzureStorageLoader; +use reqsign_0_9::AzureStorageSigner; + +use crate::raw::*; +use crate::*; + +pub struct AzdfsCore { + pub filesystem: String, + pub root: String, + pub endpoint: String, + + pub client: HttpClient, + pub loader: AzureStorageLoader, + pub signer: AzureStorageSigner, +} + +impl Debug for AzdfsCore { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("AzdfsCore") + .field("filesystem", &self.filesystem) + .field("root", &self.root) + .field("endpoint", &self.endpoint) + .finish_non_exhaustive() + } +} + +impl AzdfsCore { + async fn load_credential(&self) -> Result { + let cred = self + .loader + .load() + .await + .map_err(new_request_credential_error)?; + + if let Some(cred) = cred { + Ok(cred) + } else { + Err(Error::new( + ErrorKind::ConfigInvalid, + "no valid credential found", + )) + } + } + + pub async fn sign(&self, req: &mut Request) -> Result<()> { + let cred = self.load_credential().await?; + self.signer.sign(req, &cred).map_err(new_request_sign_error) + } + + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } +} + +impl AzdfsCore { + pub async fn azdfs_read( + &self, + path: &str, + range: BytesRange, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}/{}", + self.endpoint, + self.filesystem, + percent_encode_path(&p) + ); + + let mut req = Request::get(&url); + + if !range.is_full() { + // azblob doesn't support read with suffix range. + // + // ref: https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations + if range.offset().is_none() && range.size().is_some() { + return Err(Error::new( + ErrorKind::Unsupported, + "azblob doesn't support read with suffix range", + )); + } + + req = req.header(http::header::RANGE, range.to_header()); + } + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + /// resource should be one of `file` or `directory` + /// + /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create + pub fn azdfs_create_request( + &self, + path: &str, + resource: &str, + content_type: Option<&str>, + content_disposition: Option<&str>, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/{}/{}?resource={resource}", + self.endpoint, + self.filesystem, + percent_encode_path(&p) + ); + + let mut req = Request::put(&url); + + // Content length must be 0 for create request. + req = req.header(CONTENT_LENGTH, 0); + + if let Some(ty) = content_type { + req = req.header(CONTENT_TYPE, ty) + } + + if let Some(pos) = content_disposition { + req = req.header(CONTENT_DISPOSITION, pos) + } + + // Set body + let req = req.body(body).map_err(new_request_build_error)?; + + Ok(req) + } + + /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update + pub fn azdfs_update_request( + &self, + path: &str, + size: Option, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + // - close: Make this is the final action to this file. + // - flush: Flush the file directly. + let url = format!( + "{}/{}/{}?action=append&close=true&flush=true&position=0", + self.endpoint, + self.filesystem, + percent_encode_path(&p) + ); + + let mut req = Request::patch(&url); + + if let Some(size) = size { + req = req.header(CONTENT_LENGTH, size) + } + + // Set body + let req = req.body(body).map_err(new_request_build_error)?; + + Ok(req) + } + + pub async fn azdfs_get_properties(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/{}/{}?action=getStatus", + self.endpoint, + self.filesystem, + percent_encode_path(&p) + ); + + let req = Request::head(&url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.client.send(req).await + } + + pub async fn azdfs_delete(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/{}/{}", + self.endpoint, + self.filesystem, + percent_encode_path(&p) + ); + + let req = Request::delete(&url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azdfs_list( + &self, + path: &str, + continuation: &str, + limit: Option, + ) -> Result> { + let p = build_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let mut url = format!( + "{}/{}?resource=filesystem&recursive=false", + self.endpoint, self.filesystem + ); + if !p.is_empty() { + write!(url, "&directory={}", percent_encode_path(&p)) + .expect("write into string must succeed"); + } + if let Some(limit) = limit { + write!(url, "&maxresults={limit}").expect("write into string must succeed"); + } + if !continuation.is_empty() { + write!(url, "&continuation={continuation}").expect("write into string must succeed"); + } + + let mut req = Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } +} diff --git a/core/src/services/azdfs/mod.rs b/core/src/services/azdfs/mod.rs index 730bd99e47f..db015ca9540 100644 --- a/core/src/services/azdfs/mod.rs +++ b/core/src/services/azdfs/mod.rs @@ -18,6 +18,7 @@ mod backend; pub use backend::AzdfsBuilder as Azdfs; +mod core; mod error; mod pager; mod writer; diff --git a/core/src/services/azdfs/pager.rs b/core/src/services/azdfs/pager.rs index 5f856a0b1e9..4f414a04ce5 100644 --- a/core/src/services/azdfs/pager.rs +++ b/core/src/services/azdfs/pager.rs @@ -23,14 +23,14 @@ use serde_json::de; use time::format_description::well_known::Rfc2822; use time::OffsetDateTime; -use super::backend::AzdfsBackend; +use super::core::AzdfsCore; use super::error::parse_error; use crate::raw::*; use crate::*; pub struct AzdfsPager { - backend: Arc, - root: String, + core: Arc, + path: String, limit: Option, @@ -39,15 +39,9 @@ pub struct AzdfsPager { } impl AzdfsPager { - pub fn new( - backend: Arc, - root: String, - path: String, - limit: Option, - ) -> Self { + pub fn new(core: Arc, path: String, limit: Option) -> Self { Self { - backend, - root, + core, path, limit, @@ -65,7 +59,7 @@ impl oio::Page for AzdfsPager { } let resp = self - .backend + .core .azdfs_list(&self.path, &self.continuation, self.limit) .await?; @@ -123,7 +117,7 @@ impl oio::Page for AzdfsPager { })?, ); - let mut path = build_rel_path(&self.root, &object.name); + let mut path = build_rel_path(&self.core.root, &object.name); if mode == EntryMode::DIR { path += "/" }; diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 2d85f9fb652..460f14876ef 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -15,33 +15,35 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use async_trait::async_trait; use bytes::Bytes; use http::StatusCode; -use super::backend::AzdfsBackend; +use super::core::AzdfsCore; use super::error::parse_error; use crate::ops::OpWrite; use crate::raw::*; use crate::*; pub struct AzdfsWriter { - backend: AzdfsBackend, + core: Arc, op: OpWrite, path: String, } impl AzdfsWriter { - pub fn new(backend: AzdfsBackend, op: OpWrite, path: String) -> Self { - AzdfsWriter { backend, op, path } + pub fn new(core: Arc, op: OpWrite, path: String) -> Self { + AzdfsWriter { core, op, path } } } #[async_trait] impl oio::Write for AzdfsWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - let mut req = self.backend.azdfs_create_request( + let mut req = self.core.azdfs_create_request( &self.path, "file", self.op.content_type(), @@ -49,12 +51,9 @@ impl oio::Write for AzdfsWriter { AsyncBody::Empty, )?; - self.backend - .signer - .sign(&mut req) - .map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.backend.client.send(req).await?; + let resp = self.core.send(req).await?; let status = resp.status(); match status { @@ -69,15 +68,12 @@ impl oio::Write for AzdfsWriter { } let mut req = - self.backend + self.core .azdfs_update_request(&self.path, Some(bs.len()), AsyncBody::Bytes(bs))?; - self.backend - .signer - .sign(&mut req) - .map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.backend.client.send(req).await?; + let resp = self.core.send(req).await?; let status = resp.status(); match status {