Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Generic object store access implementation #2709

Merged
merged 6 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions crates/datasources/src/excel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use datafusion::datasource::MemTable;
use object_store::ObjectStore;

use crate::common::url::DatasourceUrl;
use crate::object_store::generic::GenericStoreAccess;
use crate::object_store::ObjStoreAccess;

pub mod errors;
Expand All @@ -28,7 +27,7 @@ pub struct ExcelTable {

impl ExcelTable {
pub async fn open(
store_access: GenericStoreAccess,
store_access: Arc<dyn ObjStoreAccess>,
source_url: DatasourceUrl,
sheet_name: Option<&str>,
has_header: bool,
Expand Down
3 changes: 1 addition & 2 deletions crates/datasources/src/json/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ use serde_json::{Map, Value};
use crate::common::url::DatasourceUrl;
use crate::json::errors::JsonError;
use crate::json::stream::{JsonPartitionStream, LazyJsonPartitionStream};
use crate::object_store::generic::GenericStoreAccess;
use crate::object_store::ObjStoreAccess;

pub async fn json_streaming_table(
store_access: GenericStoreAccess,
store_access: Arc<dyn ObjStoreAccess>,
source_url: DatasourceUrl,
) -> Result<Arc<dyn TableProvider>, JsonError> {
let path = source_url.path();
Expand Down
77 changes: 52 additions & 25 deletions crates/datasources/src/lake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,31 @@
pub mod delta;
pub mod iceberg;

use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
use object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey};
use object_store::local::LocalFileSystem;
use object_store::aws::AmazonS3ConfigKey;
use object_store::azure::AzureConfigKey;
use object_store::gcp::GoogleConfigKey;
use object_store::ObjectStore;
use protogen::metastore::types::options::StorageOptions;

use crate::common::url::{DatasourceUrl, DatasourceUrlType};
use crate::object_store::azure::AzureStoreAccess;
use crate::object_store::gcs::GcsStoreAccess;
use crate::object_store::local::LocalStoreAccess;
use crate::object_store::s3::S3StoreAccess;
use crate::object_store::ObjStoreAccess;

#[derive(Debug, thiserror::Error)]
pub enum LakeStorageOptionsError {
#[error(transparent)]
ObjectStore(#[from] object_store::Error),

#[error(transparent)]
ObjectStoreSource(#[from] crate::object_store::errors::ObjectStoreSourceError),

#[error(transparent)]
Common(#[from] crate::common::errors::DatasourceCommonError),

Expand All @@ -30,61 +38,80 @@ pub enum LakeStorageOptionsError {
UnsupportedObjectStore(DatasourceUrl),
}

/// Create an object store from the provided storage options.
pub fn storage_options_into_object_store(
pub fn storage_options_into_store_access(
url: &DatasourceUrl,
opts: &StorageOptions,
) -> Result<Arc<dyn ObjectStore>, LakeStorageOptionsError> {
) -> Result<Arc<dyn ObjStoreAccess>, LakeStorageOptionsError> {
match url.datasource_url_type() {
DatasourceUrlType::S3 => {
let bucket = url
.host()
.ok_or_else(|| LakeStorageOptionsError::MissingHost(url.clone()))?;

let mut store = AmazonS3Builder::new().with_bucket_name(bucket);

let mut s3_opts = HashMap::new();
for (key, value) in &opts.inner {
if let Ok(s3_key) = AmazonS3ConfigKey::from_str(key) {
store = store.with_config(s3_key, value);
s3_opts.insert(s3_key, value.to_string());
}
}
Ok(Arc::new(store.build()?))

Ok(Arc::new(S3StoreAccess {
bucket: bucket.to_string(),
opts: s3_opts,
region: None,
access_key_id: None,
secret_access_key: None,
}))
}
DatasourceUrlType::Gcs => {
let bucket = url
.host()
.ok_or_else(|| LakeStorageOptionsError::MissingHost(url.clone()))?;

let mut store = GoogleCloudStorageBuilder::new().with_bucket_name(bucket);

let mut gcs_opts = HashMap::new();
for (key, value) in &opts.inner {
if let Ok(gcp_key) = GoogleConfigKey::from_str(key) {
store = store.with_config(gcp_key, value);
if let Ok(gcs_key) = GoogleConfigKey::from_str(key) {
gcs_opts.insert(gcs_key, value.to_string());
}
}
Ok(Arc::new(store.build()?))

Ok(Arc::new(GcsStoreAccess {
bucket: bucket.to_string(),
opts: gcs_opts,
service_account_key: None,
}))
}
DatasourceUrlType::Azure => {
let bucket = url
let container = url
.host()
.ok_or_else(|| LakeStorageOptionsError::MissingHost(url.clone()))?;

let mut store = MicrosoftAzureBuilder::new().with_container_name(bucket);

let mut azure_opts = HashMap::new();
for (key, value) in &opts.inner {
if let Ok(azure_key) = AzureConfigKey::from_str(key) {
store = store.with_config(azure_key, value);
azure_opts.insert(azure_key, value.to_string());
}
}

Ok(Arc::new(store.build()?))
}
DatasourceUrlType::File => {
let store = LocalFileSystem::new();
Ok(Arc::new(store))
Ok(Arc::new(AzureStoreAccess {
container: container.to_string(),
opts: azure_opts,
account_name: None,
access_key: None,
}))
}
DatasourceUrlType::File => Ok(Arc::new(LocalStoreAccess)),
DatasourceUrlType::Http => {
Err(LakeStorageOptionsError::UnsupportedObjectStore(url.clone()))
}
}
}

/// Create an object store from the provided storage options.
pub fn storage_options_into_object_store(
url: &DatasourceUrl,
opts: &StorageOptions,
) -> Result<Arc<dyn ObjectStore>, LakeStorageOptionsError> {
let access = storage_options_into_store_access(url, opts)?;
Ok(access.create_store()?)
}
92 changes: 92 additions & 0 deletions crates/datasources/src/object_store/azure.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;

use datafusion::execution::object_store::ObjectStoreUrl;
use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
use object_store::path::Path as ObjectStorePath;
use object_store::ObjectStore;
use protogen::metastore::types::options::StorageOptions;

use super::errors::{ObjectStoreSourceError, Result};
use super::ObjStoreAccess;
use crate::common::url::{DatasourceUrl, DatasourceUrlType};

#[derive(Debug, Clone)]
pub struct AzureStoreAccess {
/// Container name for Azure store.
pub container: String,
/// Account name for Azure store.
pub account_name: Option<String>,
/// Access key for Azure store account.
pub access_key: Option<String>,
/// Other options for Azure store.
pub opts: HashMap<AzureConfigKey, String>,
}

impl AzureStoreAccess {
pub fn try_from_uri(uri: &DatasourceUrl, opts: &StorageOptions) -> Result<Self> {
if uri.datasource_url_type() != DatasourceUrlType::Azure {
return Err(ObjectStoreSourceError::String(format!(
"invalid URL scheme for azure table: {uri}",
)));
}

let container = uri.host().ok_or_else(|| {
ObjectStoreSourceError::String(format!("missing container name in URI: {uri}"))
})?;

let opts = opts
.inner
.iter()
.map(|(k, v)| {
let k: AzureConfigKey = k.parse()?;
Ok((k, v.to_string()))
})
.collect::<Result<HashMap<_, _>>>()?;

Ok(Self {
container: container.to_string(),
account_name: None,
access_key: None,
opts,
})
}
}

impl Display for AzureStoreAccess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Azure(container: {})", self.container)
}
}

impl ObjStoreAccess for AzureStoreAccess {
fn base_url(&self) -> Result<ObjectStoreUrl> {
let u = format!("azure://{}", self.container);
let u = ObjectStoreUrl::parse(u)?;
Ok(u)
}

fn create_store(&self) -> Result<Arc<dyn ObjectStore>> {
let mut builder = MicrosoftAzureBuilder::new();

for (key, val) in self.opts.iter() {
builder = builder.with_config(*key, val);
}

if let Some(account_name) = &self.account_name {
builder = builder.with_account(account_name);
}

if let Some(access_key) = &self.access_key {
builder = builder.with_access_key(access_key);
}

let build = builder.with_container_name(&self.container).build()?;
Ok(Arc::new(build))
}

fn path(&self, location: &str) -> Result<ObjectStorePath> {
Ok(ObjectStorePath::from_url_path(location)?)
}
}
8 changes: 8 additions & 0 deletions crates/datasources/src/object_store/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,24 @@ pub enum ObjectStoreSourceError {
#[error(transparent)]
IoError(#[from] std::io::Error),

#[error(transparent)]
DatasourceCommonError(#[from] crate::common::errors::DatasourceCommonError),

#[error("No file extension provided")]
NoFileExtension,

#[error("This file type is not supported: {0}")]
NotSupportFileType(String),

#[error("{0}")]
InvalidHttpStatus(String),

#[error("{0}")]
Static(&'static str),

#[error("{0}")]
String(String),

#[error("Failed to read object over http: {0}")]
Reqwest(#[from] reqwest::Error),
}
Expand Down
25 changes: 15 additions & 10 deletions crates/datasources/src/object_store/gcs.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;

use datafusion::execution::object_store::ObjectStoreUrl;
use object_store::gcp::GoogleCloudStorageBuilder;
use object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey};
use object_store::path::Path as ObjectStorePath;
use object_store::ObjectStore;

Expand All @@ -15,6 +16,8 @@ pub struct GcsStoreAccess {
pub bucket: String,
/// Service account key (JSON) for credentials.
pub service_account_key: Option<String>,
/// Other options for GCS.
pub opts: HashMap<GoogleConfigKey, String>,
}

impl Display for GcsStoreAccess {
Expand All @@ -31,15 +34,17 @@ impl ObjStoreAccess for GcsStoreAccess {
}

fn create_store(&self) -> Result<Arc<dyn ObjectStore>> {
let builder = GoogleCloudStorageBuilder::new().with_bucket_name(&self.bucket);
let builder = match &self.service_account_key {
Some(key) => builder.with_service_account_key(key),
None => {
// TODO: Null Credentials
builder
}
};
let build = builder.build()?;
let mut builder = GoogleCloudStorageBuilder::new();

for (key, val) in self.opts.iter() {
builder = builder.with_config(*key, val);
}

if let Some(service_account_key) = &self.service_account_key {
builder = builder.with_service_account_key(service_account_key);
}

let build = builder.with_bucket_name(&self.bucket).build()?;
Ok(Arc::new(build))
}

Expand Down
65 changes: 0 additions & 65 deletions crates/datasources/src/object_store/generic.rs

This file was deleted.

Loading