diff --git a/server/Cargo.toml b/server/Cargo.toml index 08bbd8a71..2f1a8bcac 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -16,7 +16,7 @@ arrow-json = "53.0.0" arrow-ipc = { version = "53.0.0", features = ["zstd"] } arrow-select = "53.0.0" datafusion = "42.0.0" -object_store = { version = "0.11.0", features = ["cloud", "aws"] } +object_store = { version = "0.11.0", features = ["cloud", "aws", "azure"] } parquet = "53.0.0" arrow-flight = { version = "53.0.0", features = [ "tls" ] } tonic = {version = "0.12.3", features = ["tls", "transport", "gzip", "zstd"] } diff --git a/server/src/metrics/storage.rs b/server/src/metrics/storage.rs index abb06aa94..a91c431cb 100644 --- a/server/src/metrics/storage.rs +++ b/server/src/metrics/storage.rs @@ -86,3 +86,42 @@ pub mod s3 { } } } + +pub mod azureblob { + use crate::{metrics::METRICS_NAMESPACE, storage::AzureBlobConfig}; + use once_cell::sync::Lazy; + use prometheus::{HistogramOpts, HistogramVec}; + + use super::StorageMetrics; + + pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( + HistogramOpts::new("azr_blob_response_time", "AzureBlob Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"], + ) + .expect("metric can be created") + }); + + pub static QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( + HistogramOpts::new("query_azr_blob_response_time", "AzureBlob Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"], + ) + .expect("metric can be created") + }); + + impl StorageMetrics for AzureBlobConfig { + fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { + handler + .registry + .register(Box::new(REQUEST_RESPONSE_TIME.clone())) + .expect("metric can be registered"); + handler + .registry + .register(Box::new(QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME.clone())) + .expect("metric can be registered"); + } + } +} diff --git a/server/src/option.rs b/server/src/option.rs index 00a699752..4471ae56b 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -18,7 +18,9 @@ use crate::cli::Cli; use crate::storage::object_storage::parseable_json_path; -use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config}; +use crate::storage::{ + AzureBlobConfig, FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config, +}; use bytes::Bytes; use clap::error::ErrorKind; use clap::{command, Args, Command, FromArgMatches}; @@ -105,6 +107,22 @@ Cloud Native, log analytics platform for modern applications."#, storage_name: "s3", } } + Some(("azure-blob", m)) => { + let cli = match Cli::from_arg_matches(m) { + Ok(cli) => cli, + Err(err) => err.exit(), + }; + let storage = match AzureBlobConfig::from_arg_matches(m) { + Ok(storage) => storage, + Err(err) => err.exit(), + }; + + Config { + parseable: cli, + storage: Arc::new(storage), + storage_name: "azure_blob", + } + } _ => unreachable!(), } } @@ -163,11 +181,16 @@ Cloud Native, log analytics platform for modern applications."#, // returns the string representation of the storage mode // drive --> Local drive // s3 --> S3 bucket + // azure_blob --> Azure Blob Storage pub fn get_storage_mode_string(&self) -> &str { if self.storage_name == "drive" { return "Local drive"; + } else if self.storage_name == "s3" { + return "S3 bucket"; + } else if self.storage_name == "azure_blob" { + return "Azure Blob Storage"; } - "S3 bucket" + "Unknown" } pub fn get_server_mode_string(&self) -> &str { @@ -193,6 +216,9 @@ fn create_parseable_cli_command() -> Command { let s3 = Cli::create_cli_command_with_clap("s3-store"); let s3 = ::augment_args_for_update(s3); + let azureblob = Cli::create_cli_command_with_clap("azure-blob"); + let azureblob = ::augment_args_for_update(azureblob); + command!() .name("Parseable") .bin_name("parseable") @@ -207,7 +233,7 @@ Join the community at https://logg.ing/community. "#, ) .subcommand_required(true) - .subcommands([local, s3]) + .subcommands([local, s3, azureblob]) } #[derive(Debug, Default, Eq, PartialEq)] diff --git a/server/src/storage.rs b/server/src/storage.rs index 040c7ae26..a018c2b1c 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -24,6 +24,7 @@ use chrono::Local; use std::fmt::Debug; +mod azure_blob; mod localfs; mod metrics_layer; pub(crate) mod object_storage; @@ -34,6 +35,7 @@ mod store_metadata; use self::retention::Retention; pub use self::staging::StorageDir; +pub use azure_blob::AzureBlobConfig; pub use localfs::FSConfig; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::S3Config; diff --git a/server/src/storage/azure_blob.rs b/server/src/storage/azure_blob.rs new file mode 100644 index 000000000..1e4e8a807 --- /dev/null +++ b/server/src/storage/azure_blob.rs @@ -0,0 +1,746 @@ +use bytes::Bytes; +use datafusion::datasource::listing::ListingTableUrl; +use futures::stream::FuturesUnordered; +use futures::{StreamExt, TryStreamExt}; +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ +use super::object_storage::parseable_json_path; +use super::{ + LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, + SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, +}; +use async_trait::async_trait; +use datafusion::datasource::object_store::{ + DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl, +}; +use datafusion::execution::runtime_env::RuntimeConfig; +use object_store::azure::MicrosoftAzureBuilder; +use object_store::{ClientOptions, ObjectStore, PutPayload}; +use relative_path::{RelativePath, RelativePathBuf}; +use std::path::Path as StdPath; +use url::Url; + +use super::metrics_layer::MetricLayer; +use crate::handlers::http::users::USERS_ROOT_DIR; +use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME; +use crate::metrics::storage::StorageMetrics; +use object_store::limit::LimitStore; +use object_store::path::Path as StorePath; +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +const CONNECT_TIMEOUT_SECS: u64 = 5; +const REQUEST_TIMEOUT_SECS: u64 = 300; + +#[derive(Debug, Clone, clap::Args)] +#[command( + name = "Azure config", + about = "Start Parseable with Azure Blob storage", + help_template = "\ +{about-section} +{all-args} +" +)] +pub struct AzureBlobConfig { + /// The endpoint to Azure Blob Storage + /// eg. `https://{account}.blob.core.windows.net` + #[arg(long, env = "P_AZR_URL", value_name = "url", required = true)] + pub endpoint_url: String, + + // The Azure Storage Account ID + #[arg(long, env = "P_AZR_ACCOUNT", value_name = "account", required = true)] + pub account: String, + + /// The Azure Storage Access key + #[arg( + long, + env = "P_AZR_ACCESS_KEY", + value_name = "access-key", + required = true + )] + pub access_key: String, + + ///Client ID + #[arg( + long, + env = "P_AZR_CLIENT_ID", + value_name = "client-id", + required = false + )] + pub client_id: Option, + + ///Secret ID + #[arg( + long, + env = "P_AZR_CLIENT_SECRET", + value_name = "client-secret", + required = false + )] + pub client_secret: Option, + + ///Tenant ID + #[arg( + long, + env = "P_AZR_TENANT_ID", + value_name = "tenant-id", + required = false + )] + pub tenant_id: Option, + + /// The container name to be used for storage + #[arg( + long, + env = "P_AZR_CONTAINER", + value_name = "container", + required = true + )] + pub container: String, +} + +impl AzureBlobConfig { + fn get_default_builder(&self) -> MicrosoftAzureBuilder { + let client_options = ClientOptions::default() + .with_allow_http(true) + .with_connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS)) + .with_timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS)); + + let mut builder = MicrosoftAzureBuilder::new() + .with_endpoint(self.endpoint_url.clone()) + .with_account(&self.account) + .with_access_key(&self.access_key) + .with_container_name(&self.container); + + if let (Some(client_id), Some(client_secret), Some(tenant_id)) = ( + self.client_id.clone(), + self.client_secret.clone(), + self.tenant_id.clone(), + ) { + builder = builder.with_client_secret_authorization(client_id, client_secret, tenant_id) + } + + builder.with_client_options(client_options) + } +} + +impl ObjectStorageProvider for AzureBlobConfig { + fn get_datafusion_runtime(&self) -> RuntimeConfig { + let azure = self.get_default_builder().build().unwrap(); + // limit objectstore to a concurrent request limit + let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); + let azure = MetricLayer::new(azure); + + let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new(); + let url = ObjectStoreUrl::parse(format!("https://{}.blob.core.windows.net", self.account)) + .unwrap(); + object_store_registry.register_store(url.as_ref(), Arc::new(azure)); + + RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) + } + + fn get_object_store(&self) -> Arc { + let azure = self.get_default_builder().build().unwrap(); + // limit objectstore to a concurrent request limit + let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); + Arc::new(ObjStoreClient::new( + azure, + self.account.clone(), + self.container.clone(), + StorePath::from(""), + )) + } + + fn get_endpoint(&self) -> String { + self.endpoint_url.clone() + } + + fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { + self.register_metrics(handler) + } +} + +pub fn to_object_store_path(path: &RelativePath) -> StorePath { + StorePath::from(path.as_str()) +} + +// ObjStoreClient is generic client to enable interactions with different cloudprovider's +// object store such as S3 and Azure Blob +pub struct ObjStoreClient { + client: LimitStore, + account: String, + container_name: String, + root: StorePath, +} + +impl ObjStoreClient { + pub fn new( + client: LimitStore, + account: String, + container_name: String, + root: StorePath, + ) -> Self { + ObjStoreClient { + client, + account, + container_name, + root, + } + } + + async fn _get_object(&self, path: &RelativePath) -> Result { + let instant = Instant::now(); + let resp = self.client.get(&to_object_store_path(path)).await; + + match resp { + Ok(resp) => { + let time = instant.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(time); + let body = resp.bytes().await.unwrap(); + Ok(body) + } + Err(err) => { + let time = instant.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "400"]) + .observe(time); + Err(err.into()) + } + } + } + + async fn _put_object( + &self, + path: &RelativePath, + resource: PutPayload, + ) -> Result<(), ObjectStorageError> { + let time = Instant::now(); + let resp = self.client.put(&to_object_store_path(path), resource).await; + let status = if resp.is_ok() { "200" } else { "400" }; + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["PUT", status]) + .observe(time); + + if let Err(object_store::Error::NotFound { source, .. }) = &resp { + return Err(ObjectStorageError::Custom( + format!("Failed to upload, error: {:?}", source).to_string(), + )); + } + + resp.map(|_| ()).map_err(|err| err.into()) + } + + async fn _delete_prefix(&self, key: &str) -> Result<(), ObjectStorageError> { + let object_stream = self.client.list(Some(&(key.into()))); + + object_stream + .for_each_concurrent(None, |x| async { + match x { + Ok(obj) => { + if (self.client.delete(&obj.location).await).is_err() { + log::error!("Failed to fetch object during delete stream"); + } + } + Err(_) => { + log::error!("Failed to fetch object during delete stream"); + } + }; + }) + .await; + + Ok(()) + } + + async fn _list_streams(&self) -> Result, ObjectStorageError> { + let resp = self.client.list_with_delimiter(None).await?; + + let common_prefixes = resp.common_prefixes; // get all dirs + + // return prefixes at the root level + let dirs: Vec<_> = common_prefixes + .iter() + .filter_map(|path| path.parts().next()) + .map(|name| name.as_ref().to_string()) + .filter(|x| x != PARSEABLE_ROOT_DIRECTORY) + .filter(|x| x != USERS_ROOT_DIR) + .collect(); + + let stream_json_check = FuturesUnordered::new(); + + for dir in &dirs { + let key = format!( + "{}/{}/{}", + dir, STREAM_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME + ); + let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; + stream_json_check.push(task); + } + + stream_json_check.try_collect::<()>().await?; + + Ok(dirs.into_iter().map(|name| LogStream { name }).collect()) + } + + async fn _list_dates(&self, stream: &str) -> Result, ObjectStorageError> { + let resp = self + .client + .list_with_delimiter(Some(&(stream.into()))) + .await?; + + let common_prefixes = resp.common_prefixes; + + // return prefixes at the root level + let dates: Vec<_> = common_prefixes + .iter() + .filter_map(|path| path.as_ref().strip_prefix(&format!("{stream}/"))) + .map(String::from) + .collect(); + + Ok(dates) + } + + async fn _list_manifest_files( + &self, + stream: &str, + ) -> Result>, ObjectStorageError> { + let mut result_file_list: BTreeMap> = BTreeMap::new(); + let resp = self + .client + .list_with_delimiter(Some(&(stream.into()))) + .await?; + + let dates = resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .filter(|name| name.as_ref() != stream && name.as_ref() != STREAM_ROOT_DIRECTORY) + .map(|name| name.as_ref().to_string()) + .collect::>(); + for date in dates { + let date_path = object_store::path::Path::from(format!("{}/{}", stream, &date)); + let resp = self.client.list_with_delimiter(Some(&date_path)).await?; + let manifests: Vec = resp + .objects + .iter() + .filter(|name| name.location.filename().unwrap().ends_with("manifest.json")) + .map(|name| name.location.to_string()) + .collect(); + result_file_list.entry(date).or_default().extend(manifests); + } + Ok(result_file_list) + } + async fn _upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + let instant = Instant::now(); + + // // TODO: Uncomment this when multipart is fixed + // let should_multipart = std::fs::metadata(path)?.len() > MULTIPART_UPLOAD_SIZE as u64; + + let should_multipart = false; + + let res = if should_multipart { + // self._upload_multipart(key, path).await + // this branch will never get executed + Ok(()) + } else { + let bytes = tokio::fs::read(path).await?; + let result = self.client.put(&key.into(), bytes.into()).await?; + log::info!("Uploaded file to Azure Blob Storage: {:?}", result); + Ok(()) + }; + + let status = if res.is_ok() { "200" } else { "400" }; + let time = instant.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["UPLOAD_PARQUET", status]) + .observe(time); + + res + } + + // TODO: introduce parallel, multipart-uploads if required + // async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + // let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2]; + // let mut file = OpenOptions::new().read(true).open(path).await?; + + // // let (multipart_id, mut async_writer) = self.client.put_multipart(&key.into()).await?; + // let mut async_writer = self.client.put_multipart(&key.into()).await?; + + // /* `abort_multipart()` has been removed */ + // // let close_multipart = |err| async move { + // // log::error!("multipart upload failed. {:?}", err); + // // self.client + // // .abort_multipart(&key.into(), &multipart_id) + // // .await + // // }; + + // loop { + // match file.read(&mut buf).await { + // Ok(len) => { + // if len == 0 { + // break; + // } + // if let Err(err) = async_writer.write_all(&buf[0..len]).await { + // // close_multipart(err).await?; + // break; + // } + // if let Err(err) = async_writer.flush().await { + // // close_multipart(err).await?; + // break; + // } + // } + // Err(err) => { + // // close_multipart(err).await?; + // break; + // } + // } + // } + + // async_writer.shutdown().await?; + + // Ok(()) + // } +} + +#[async_trait] +impl ObjectStorage for ObjStoreClient { + async fn get_object(&self, path: &RelativePath) -> Result { + Ok(self._get_object(path).await?) + } + + async fn get_objects( + &self, + base_path: Option<&RelativePath>, + filter_func: Box bool + Send>, + ) -> Result, ObjectStorageError> { + let instant = Instant::now(); + + let prefix = if let Some(base_path) = base_path { + to_object_store_path(base_path) + } else { + self.root.clone() + }; + + let mut list_stream = self.client.list(Some(&prefix)); + + let mut res = vec![]; + + while let Some(meta) = list_stream.next().await.transpose()? { + let ingestor_file = filter_func(meta.location.filename().unwrap().to_string()); + + if !ingestor_file { + continue; + } + + let byts = self + .get_object( + RelativePath::from_path(meta.location.as_ref()) + .map_err(ObjectStorageError::PathError)?, + ) + .await?; + + res.push(byts); + } + + let instant = instant.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(instant); + + Ok(res) + } + + async fn get_ingestor_meta_file_paths( + &self, + ) -> Result, ObjectStorageError> { + let time = Instant::now(); + let mut path_arr = vec![]; + let mut object_stream = self.client.list(Some(&self.root)); + + while let Some(meta) = object_stream.next().await.transpose()? { + let flag = meta.location.filename().unwrap().starts_with("ingestor"); + + if flag { + path_arr.push(RelativePathBuf::from(meta.location.as_ref())); + } + } + + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(time); + + Ok(path_arr) + } + + async fn get_stream_file_paths( + &self, + stream_name: &str, + ) -> Result, ObjectStorageError> { + let time = Instant::now(); + let mut path_arr = vec![]; + let path = to_object_store_path(&RelativePathBuf::from(stream_name)); + let mut object_stream = self.client.list(Some(&path)); + + while let Some(meta) = object_stream.next().await.transpose()? { + let flag = meta.location.filename().unwrap().starts_with(".ingestor"); + + if flag { + path_arr.push(RelativePathBuf::from(meta.location.as_ref())); + } + } + + path_arr.push(RelativePathBuf::from_iter([ + stream_name, + STREAM_METADATA_FILE_NAME, + ])); + path_arr.push(RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME])); + + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(time); + + Ok(path_arr) + } + + async fn put_object( + &self, + path: &RelativePath, + resource: Bytes, + ) -> Result<(), ObjectStorageError> { + self._put_object(path, resource.into()) + .await + .map_err(|err| ObjectStorageError::ConnectionError(Box::new(err)))?; + + Ok(()) + } + + async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { + self._delete_prefix(path.as_ref()).await?; + + Ok(()) + } + + async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { + Ok(self.client.delete(&to_object_store_path(path)).await?) + } + + async fn check(&self) -> Result<(), ObjectStorageError> { + Ok(self + .client + .head(&to_object_store_path(&parseable_json_path())) + .await + .map(|_| ())?) + } + + async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { + self._delete_prefix(stream_name).await?; + + Ok(()) + } + + async fn try_delete_ingestor_meta( + &self, + ingestor_filename: String, + ) -> Result<(), ObjectStorageError> { + let file = RelativePathBuf::from(&ingestor_filename); + match self.client.delete(&to_object_store_path(&file)).await { + Ok(_) => Ok(()), + Err(err) => { + // if the object is not found, it is not an error + // the given url path was incorrect + if matches!(err, object_store::Error::NotFound { .. }) { + log::error!("Node does not exist"); + Err(err.into()) + } else { + log::error!("Error deleting ingestor meta file: {:?}", err); + Err(err.into()) + } + } + } + } + + async fn list_streams(&self) -> Result, ObjectStorageError> { + let streams = self._list_streams().await?; + + Ok(streams) + } + + async fn list_old_streams(&self) -> Result, ObjectStorageError> { + let resp = self.client.list_with_delimiter(None).await?; + + let common_prefixes = resp.common_prefixes; // get all dirs + + // return prefixes at the root level + let dirs: Vec<_> = common_prefixes + .iter() + .filter_map(|path| path.parts().next()) + .map(|name| name.as_ref().to_string()) + .filter(|x| x != PARSEABLE_ROOT_DIRECTORY) + .collect(); + + let stream_json_check = FuturesUnordered::new(); + + for dir in &dirs { + let key = format!("{}/{}", dir, STREAM_METADATA_FILE_NAME); + let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; + stream_json_check.push(task); + } + + stream_json_check.try_collect::<()>().await?; + + Ok(dirs.into_iter().map(|name| LogStream { name }).collect()) + } + + async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { + let streams = self._list_dates(stream_name).await?; + + Ok(streams) + } + + async fn list_manifest_files( + &self, + stream_name: &str, + ) -> Result>, ObjectStorageError> { + let files = self._list_manifest_files(stream_name).await?; + + Ok(files) + } + + async fn upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + self._upload_file(key, path).await?; + + Ok(()) + } + + fn absolute_url(&self, prefix: &RelativePath) -> object_store::path::Path { + object_store::path::Path::parse(prefix).unwrap() + } + + fn query_prefixes(&self, prefixes: Vec) -> Vec { + prefixes + .into_iter() + .map(|prefix| { + let path = format!( + "https://{}.blob.core.windows.net/{}/{}", + self.account, self.container_name, prefix + ); + ListingTableUrl::parse(path).unwrap() + }) + .collect() + } + + fn store_url(&self) -> Url { + let url_string = format!("https://{}.blob.core.windows.net", self.account); + Url::parse(&url_string).unwrap() + } + + async fn list_dirs(&self) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from("/"); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + Ok(resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .map(|name| name.as_ref().to_string()) + .collect::>()) + } + + async fn get_all_dashboards(&self) -> Result, ObjectStorageError> { + let mut dashboards = vec![]; + let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); + let resp = self + .client + .list_with_delimiter(Some(&users_root_path)) + .await?; + + let users = resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .filter(|name| name.as_ref() != USERS_ROOT_DIR) + .map(|name| name.as_ref().to_string()) + .collect::>(); + for user in users { + let user_dashboard_path = object_store::path::Path::from(format!( + "{}/{}/{}", + USERS_ROOT_DIR, user, "dashboards" + )); + let dashboards_path = RelativePathBuf::from(&user_dashboard_path); + let dashboard_bytes = self + .get_objects( + Some(&dashboards_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + dashboards.extend(dashboard_bytes); + } + Ok(dashboards) + } + + async fn get_all_saved_filters(&self) -> Result, ObjectStorageError> { + let mut filters = vec![]; + let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); + let resp = self + .client + .list_with_delimiter(Some(&users_root_path)) + .await?; + + let users = resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .filter(|name| name.as_ref() != USERS_ROOT_DIR) + .map(|name| name.as_ref().to_string()) + .collect::>(); + for user in users { + let user_filters_path = object_store::path::Path::from(format!( + "{}/{}/{}", + USERS_ROOT_DIR, user, "filters" + )); + let resp = self + .client + .list_with_delimiter(Some(&user_filters_path)) + .await?; + let streams = resp + .common_prefixes + .iter() + .filter(|name| name.as_ref() != USERS_ROOT_DIR) + .map(|name| name.as_ref().to_string()) + .collect::>(); + for stream in streams { + let filters_path = RelativePathBuf::from(&stream); + let filter_bytes = self + .get_objects( + Some(&filters_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + filters.extend(filter_bytes); + } + } + Ok(filters) + } + + fn get_bucket_name(&self) -> String { + self.container_name.clone() + } +} diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index d359a11a3..ebb4cd273 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -25,7 +25,7 @@ use datafusion::datasource::object_store::{ use datafusion::execution::runtime_env::RuntimeConfig; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; -use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum}; +use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey, Checksum}; use object_store::limit::LimitStore; use object_store::path::Path as StorePath; use object_store::{ClientOptions, ObjectStore, PutPayload}; @@ -200,7 +200,7 @@ impl ObjectStorageProvider for S3Config { // limit objectstore to a concurrent request limit let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); - Arc::new(S3 { + Arc::new(ObjStoreClient { client: s3, bucket: self.bucket_name.clone(), root: StorePath::from(""), @@ -216,17 +216,19 @@ impl ObjectStorageProvider for S3Config { } } -fn to_object_store_path(path: &RelativePath) -> StorePath { +pub fn to_object_store_path(path: &RelativePath) -> StorePath { StorePath::from(path.as_str()) } -pub struct S3 { - client: LimitStore, +// ObjStoreClient is generic client to enable interactions with different cloudprovider's +// object store such as S3 and Azure Blob +pub struct ObjStoreClient { + client: LimitStore, bucket: String, root: StorePath, } -impl S3 { +impl ObjStoreClient { async fn _get_object(&self, path: &RelativePath) -> Result { let instant = Instant::now(); @@ -448,7 +450,7 @@ impl S3 { } #[async_trait] -impl ObjectStorage for S3 { +impl ObjectStorage for ObjStoreClient { async fn get_object(&self, path: &RelativePath) -> Result { Ok(self._get_object(path).await?) }