From fab80c9601a8fd128176586e92a31d09f75062fb Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Sun, 3 Nov 2024 07:29:00 +0100 Subject: [PATCH] feat(core/services-azblob): support user defined metadata (#5274) --- core/src/raw/http_util/header.rs | 17 +++++++++++++++++ core/src/raw/http_util/mod.rs | 1 + core/src/services/azblob/backend.rs | 14 +++++++++++++- core/src/services/azblob/core.rs | 11 ++++++++++- core/src/services/oss/backend.rs | 4 +--- core/src/services/oss/core.rs | 29 ++++++----------------------- core/src/services/s3/backend.rs | 14 ++------------ core/src/services/s3/core.rs | 3 ++- 8 files changed, 52 insertions(+), 41 deletions(-) diff --git a/core/src/raw/http_util/header.rs b/core/src/raw/http_util/header.rs index 87748c96f4db..49dff81a0ff7 100644 --- a/core/src/raw/http_util/header.rs +++ b/core/src/raw/http_util/header.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; + use base64::engine::general_purpose; use base64::Engine; use chrono::DateTime; @@ -189,6 +191,21 @@ pub fn parse_into_metadata(path: &str, headers: &HeaderMap) -> Result Ok(m) } +/// Parse prefixed headers and return a map with the prefix of each header removed. +pub fn parse_prefixed_headers(headers: &HeaderMap, prefix: &str) -> HashMap { + headers + .iter() + .filter_map(|(name, value)| { + name.as_str().strip_prefix(prefix).and_then(|stripped_key| { + value + .to_str() + .ok() + .map(|parsed_value| (stripped_key.to_string(), parsed_value.to_string())) + }) + }) + .collect() +} + /// format content md5 header by given input. pub fn format_content_md5(bs: &[u8]) -> String { let mut hasher = md5::Md5::new(); diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs index 226fb17b7d47..8999b6b807f0 100644 --- a/core/src/raw/http_util/mod.rs +++ b/core/src/raw/http_util/mod.rs @@ -49,6 +49,7 @@ pub use header::parse_header_to_str; pub use header::parse_into_metadata; pub use header::parse_last_modified; pub use header::parse_location; +pub use header::parse_prefixed_headers; mod uri; pub use uri::percent_decode_path; diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 8f905e3efe69..638513249582 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -33,6 +33,7 @@ use reqsign::AzureStorageSigner; use sha2::Digest; use sha2::Sha256; +use super::core::constants::X_MS_META_PREFIX; use super::error::parse_error; use super::lister::AzblobLister; use super::writer::AzblobWriter; @@ -517,6 +518,7 @@ impl Access for AzblobBackend { write_can_multi: true, write_with_cache_control: true, write_with_content_type: true, + write_with_user_metadata: true, delete: true, copy: true, @@ -545,7 +547,17 @@ impl Access for AzblobBackend { let status = resp.status(); match status { - StatusCode::OK => parse_into_metadata(path, resp.headers()).map(RpStat::new), + StatusCode::OK => { + let headers = resp.headers(); + let mut meta = parse_into_metadata(path, headers)?; + + let user_meta = parse_prefixed_headers(headers, X_MS_META_PREFIX); + if !user_meta.is_empty() { + meta.with_user_metadata(user_meta); + } + + Ok(RpStat::new(meta)) + } _ => Err(parse_error(resp)), } } diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs index c4c1925704bd..867f07651a4f 100644 --- a/core/src/services/azblob/core.rs +++ b/core/src/services/azblob/core.rs @@ -24,6 +24,7 @@ use std::time::Duration; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::Bytes; +use constants::X_MS_META_PREFIX; use http::header::HeaderName; use http::header::CONTENT_LENGTH; use http::header::CONTENT_TYPE; @@ -42,13 +43,14 @@ use uuid::Uuid; use crate::raw::*; use crate::*; -mod constants { +pub mod constants { pub const X_MS_VERSION: &str = "x-ms-version"; pub const X_MS_BLOB_TYPE: &str = "x-ms-blob-type"; pub const X_MS_COPY_SOURCE: &str = "x-ms-copy-source"; pub const X_MS_BLOB_CACHE_CONTROL: &str = "x-ms-blob-cache-control"; pub const X_MS_BLOB_CONDITION_APPENDPOS: &str = "x-ms-blob-condition-appendpos"; + pub const X_MS_META_PREFIX: &str = "x-ms-meta-"; // Server-side encryption with customer-provided headers pub const X_MS_ENCRYPTION_KEY: &str = "x-ms-encryption-key"; @@ -243,12 +245,19 @@ impl AzblobCore { let mut req = Request::put(&url); + if let Some(user_metadata) = args.user_metadata() { + for (key, value) in user_metadata { + req = req.header(format!("{X_MS_META_PREFIX}{key}"), value) + } + } + // Set SSE headers. req = self.insert_sse_headers(req); if let Some(cache_control) = args.cache_control() { req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, cache_control); } + if let Some(size) = size { req = req.header(CONTENT_LENGTH, size) } diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 7e2d67b3caf5..a627e1d4a0d8 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -489,9 +489,7 @@ impl Access for OssBackend { match status { StatusCode::OK => { let headers = resp.headers(); - let mut meta = - self.core - .parse_metadata(path, constants::X_OSS_META_PREFIX, resp.headers())?; + let mut meta = self.core.parse_metadata(path, resp.headers())?; if let Some(v) = parse_header_to_str(headers, "x-oss-version-id")? { meta.set_version(v); diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index d4dff76348e3..2a471ef8ce10 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; use std::fmt::Write; use std::time::Duration; use bytes::Bytes; +use constants::X_OSS_META_PREFIX; use http::header::CACHE_CONTROL; use http::header::CONTENT_DISPOSITION; use http::header::CONTENT_LENGTH; @@ -190,7 +190,7 @@ impl OssCore { "the format of the user metadata key is invalid, please refer the document", )); } - req = req.header(format!("{}{}", constants::X_OSS_META_PREFIX, key), value) + req = req.header(format!("{X_OSS_META_PREFIX}{key}"), value) } } @@ -213,28 +213,11 @@ impl OssCore { /// # Notes /// /// before return the user defined metadata, we'll strip the user_metadata_prefix from the key - pub fn parse_metadata( - &self, - path: &str, - user_metadata_prefix: &str, - headers: &HeaderMap, - ) -> Result { + pub fn parse_metadata(&self, path: &str, headers: &HeaderMap) -> Result { let mut m = parse_into_metadata(path, headers)?; - - let data: HashMap = headers - .iter() - .filter_map(|(key, _)| { - key.as_str() - .strip_prefix(user_metadata_prefix) - .and_then(|stripped_key| { - parse_header_to_str(headers, key) - .unwrap_or(None) - .map(|val| (stripped_key.to_string(), val.to_string())) - }) - }) - .collect(); - if !data.is_empty() { - m.with_user_metadata(data); + let user_meta = parse_prefixed_headers(headers, X_OSS_META_PREFIX); + if !user_meta.is_empty() { + m.with_user_metadata(user_meta); } Ok(m) diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 57b3a2f2fa46..43f266e7c6a4 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::Buf; +use constants::X_AMZ_META_PREFIX; use http::Response; use http::StatusCode; use log::debug; @@ -970,18 +971,7 @@ impl Access for S3Backend { let headers = resp.headers(); let mut meta = parse_into_metadata(path, headers)?; - let user_meta: HashMap = headers - .iter() - .filter_map(|(name, _)| { - name.as_str() - .strip_prefix(constants::X_AMZ_META_PREFIX) - .and_then(|stripped_key| { - parse_header_to_str(headers, name) - .unwrap_or(None) - .map(|val| (stripped_key.to_string(), val.to_string())) - }) - }) - .collect(); + let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX); if !user_meta.is_empty() { meta.with_user_metadata(user_meta); } diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 79ba66239235..944dac8921b5 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -27,6 +27,7 @@ use std::time::Duration; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::Bytes; +use constants::X_AMZ_META_PREFIX; use http::header::HeaderName; use http::header::CACHE_CONTROL; use http::header::CONTENT_DISPOSITION; @@ -462,7 +463,7 @@ impl S3Core { // Set user metadata headers. if let Some(user_metadata) = args.user_metadata() { for (key, value) in user_metadata { - req = req.header(format!("{}{}", constants::X_AMZ_META_PREFIX, key), value) + req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value) } }