Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object Store (AWS): Support region configured via named profile #4161

Merged
merged 10 commits into from
May 16, 2023
62 changes: 0 additions & 62 deletions object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,68 +515,6 @@ async fn web_identity(
})
}

#[cfg(feature = "aws_profile")]
mod profile {
use super::*;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::provider_config::ProviderConfig;
use aws_credential_types::provider::ProvideCredentials;
use aws_types::region::Region;
use std::time::SystemTime;

#[derive(Debug)]
pub struct ProfileProvider {
cache: TokenCache<Arc<AwsCredential>>,
credentials: ProfileFileCredentialsProvider,
}

impl ProfileProvider {
pub fn new(name: String, region: String) -> Self {
let config = ProviderConfig::default().with_region(Some(Region::new(region)));

Self {
cache: Default::default(),
credentials: ProfileFileCredentialsProvider::builder()
.configure(&config)
.profile_name(name)
.build(),
}
}
}

impl CredentialProvider for ProfileProvider {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
Box::pin(self.cache.get_or_insert_with(move || async move {
let c =
self.credentials
.provide_credentials()
.await
.map_err(|source| crate::Error::Generic {
store: "S3",
source: Box::new(source),
})?;
let t_now = SystemTime::now();
let expiry = c
.expiry()
.and_then(|e| e.duration_since(t_now).ok())
.map(|ttl| Instant::now() + ttl);

Ok(TemporaryToken {
token: Arc::new(AwsCredential {
key_id: c.access_key_id().to_string(),
secret_key: c.secret_access_key().to_string(),
token: c.session_token().map(ToString::to_string),
}),
expiry,
})
}))
}
}
}

#[cfg(feature = "aws_profile")]
pub use profile::ProfileProvider;

#[cfg(test)]
mod tests {
use super::*;
Expand Down
86 changes: 83 additions & 3 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ mod checksum;
mod client;
mod credential;

#[cfg(feature = "aws_profile")]
mod profile;

// http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
//
// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
Expand Down Expand Up @@ -434,7 +437,7 @@ pub struct AmazonS3Builder {

/// Configuration keys for [`AmazonS3Builder`]
///
/// Configuration via keys can be dome via the [`try_with_option`](AmazonS3Builder::try_with_option)
/// Configuration via keys can be done via the [`try_with_option`](AmazonS3Builder::try_with_option)
/// or [`with_options`](AmazonS3Builder::try_with_options) methods on the builder.
///
/// # Example
Expand Down Expand Up @@ -986,8 +989,14 @@ impl AmazonS3Builder {
self.parse_url(&url)?;
}

let region = match (self.region.clone(), self.profile.clone()) {
(Some(region), _) => Some(region),
(None, Some(profile)) => profile_region(profile),
(None, None) => None,
};

let bucket = self.bucket_name.context(MissingBucketNameSnafu)?;
let region = self.region.context(MissingRegionSnafu)?;
let region = region.context(MissingRegionSnafu)?;

let credentials = match (self.access_key_id, self.secret_access_key, self.token) {
(Some(key_id), Some(secret_key), token) => {
Expand Down Expand Up @@ -1094,12 +1103,36 @@ impl AmazonS3Builder {
}
}

#[cfg(feature = "aws_profile")]
fn profile_region(profile: String) -> Option<String> {
use std::{panic, thread};
use tokio::runtime::Handle;

let handle = Handle::current();
let provider = profile::ProfileProvider::new(profile, None);

let result = thread::spawn(move || handle.block_on(provider.get_region()));

match result.join() {
Ok(region) => region,
Err(e) => panic::resume_unwind(e),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let result = thread::spawn(move || handle.block_on(provider.get_region()));
match result.join() {
Ok(region) => region,
Err(e) => panic::resume_unwind(e),
}
handle.block_on(provider.get_region())

Both are equally "problematic" in that they pin a tokio worker, but this at least avoids spawning an additional thread

}

#[cfg(feature = "aws_profile")]
fn profile_credentials(
profile: String,
region: String,
) -> Result<Box<dyn CredentialProvider>> {
Ok(Box::new(credential::ProfileProvider::new(profile, region)))
Ok(Box::new(profile::ProfileProvider::new(
profile,
Some(region),
)))
}

#[cfg(not(feature = "aws_profile"))]
fn profile_region(_profile: String) -> Option<String> {
None
}

#[cfg(not(feature = "aws_profile"))]
Expand Down Expand Up @@ -1563,3 +1596,50 @@ mod tests {
}
}
}

#[cfg(all(test, feature = "aws_profile"))]
mod profile_tests {
use super::*;
use std::env;

use super::profile::{TEST_PROFILE_NAME, TEST_PROFILE_REGION};

#[tokio::test]
async fn s3_test_region_from_profile() {
let s3_url = "s3://bucket/prefix".to_owned();

let s3 = AmazonS3Builder::new()
.with_url(s3_url)
.with_profile(TEST_PROFILE_NAME)
.build()
.unwrap();

let region = &s3.client.config().region;

assert_eq!(region, TEST_PROFILE_REGION);
}

#[test]
fn s3_test_region_override() {
let s3_url = "s3://bucket/prefix".to_owned();

let aws_profile =
env::var("AWS_PROFILE").unwrap_or_else(|_| TEST_PROFILE_NAME.into());

let aws_region =
env::var("AWS_REGION").unwrap_or_else(|_| "object_store:fake_region".into());

env::set_var("AWS_PROFILE", &aws_profile);

let s3 = AmazonS3Builder::from_env()
.with_url(s3_url)
.with_region(aws_region.clone())
.build()
.unwrap();

let actual = &s3.client.config().region;
let expected = &aws_region;

assert_eq!(actual, expected);
}
}
111 changes: 111 additions & 0 deletions object_store/src/aws/profile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#![cfg(feature = "aws_profile")]

use aws_config::meta::region::ProvideRegion;
use aws_config::profile::profile_file::ProfileFiles;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::profile::ProfileFileRegionProvider;
use aws_config::provider_config::ProviderConfig;
use aws_credential_types::provider::ProvideCredentials;
use aws_types::region::Region;
use futures::future::BoxFuture;
use std::sync::Arc;
use std::time::Instant;
use std::time::SystemTime;

use crate::aws::credential::CredentialProvider;
use crate::aws::AwsCredential;
use crate::client::token::{TemporaryToken, TokenCache};
use crate::Result;

#[cfg(test)]
pub static TEST_PROFILE_NAME: &str = "object_store:fake_profile";

#[cfg(test)]
pub static TEST_PROFILE_REGION: &str = "object_store:fake_region_from_profile";

#[derive(Debug)]
pub struct ProfileProvider {
name: String,
region: Option<String>,
cache: TokenCache<Arc<AwsCredential>>,
}

impl ProfileProvider {
pub fn new(name: String, region: Option<String>) -> Self {
Self {
name,
region,
cache: Default::default(),
}
}

#[cfg(test)]
fn profile_files(&self) -> ProfileFiles {
use aws_config::profile::profile_file::ProfileFileKind;

let config = format!(
"[profile {}]\nregion = {}",
TEST_PROFILE_NAME, TEST_PROFILE_REGION
);

ProfileFiles::builder()
.with_contents(ProfileFileKind::Config, config)
.build()
}

#[cfg(not(test))]
fn profile_files(&self) -> ProfileFiles {
ProfileFiles::default()
}

pub async fn get_region(&self) -> Option<String> {
if let Some(region) = self.region.clone() {
return Some(region);
}

let provider = ProfileFileRegionProvider::builder()
.profile_files(self.profile_files())
.profile_name(&self.name)
.build();

let region = provider.region().await;

region.map(|r| r.as_ref().to_owned())
}
}

impl CredentialProvider for ProfileProvider {
fn get_credential(&self) -> BoxFuture<'_, Result<Arc<AwsCredential>>> {
Box::pin(self.cache.get_or_insert_with(move || async move {
let region = self.region.clone().map(Region::new);

let config = ProviderConfig::default().with_region(region);

let credentials = ProfileFileCredentialsProvider::builder()
.configure(&config)
.profile_name(&self.name)
.build();

let c = credentials.provide_credentials().await.map_err(|source| {
crate::Error::Generic {
store: "S3",
source: Box::new(source),
}
})?;
let t_now = SystemTime::now();
let expiry = c
.expiry()
.and_then(|e| e.duration_since(t_now).ok())
.map(|ttl| Instant::now() + ttl);

Ok(TemporaryToken {
token: Arc::new(AwsCredential {
key_id: c.access_key_id().to_string(),
secret_key: c.secret_access_key().to_string(),
token: c.session_token().map(ToString::to_string),
}),
expiry,
})
}))
}
}