diff --git a/object_store/src/gcp/credential.rs b/object_store/src/gcp/credential.rs index cc157dd41985..56468568b35f 100644 --- a/object_store/src/gcp/credential.rs +++ b/object_store/src/gcp/credential.rs @@ -17,16 +17,30 @@ use crate::client::retry::RetryExt; use crate::client::token::TemporaryToken; +use crate::ClientOptions; use crate::RetryConfig; +use async_trait::async_trait; use base64::prelude::BASE64_URL_SAFE_NO_PAD; use base64::Engine; +use futures::TryFutureExt; use reqwest::{Client, Method}; use ring::signature::RsaKeyPair; use snafu::{ResultExt, Snafu}; +use std::env; +use std::fs::File; +use std::io::BufReader; +use std::path::Path; use std::time::{Duration, Instant}; +use tracing::info; #[derive(Debug, Snafu)] pub enum Error { + #[snafu(display("Unable to open service account file: {}", source))] + OpenCredentials { source: std::io::Error }, + + #[snafu(display("Unable to decode service account file: {}", source))] + DecodeCredentials { source: serde_json::Error }, + #[snafu(display("No RSA key found in pem file"))] MissingKey, @@ -47,6 +61,12 @@ pub enum Error { #[snafu(display("Error getting token response body: {}", source))] TokenResponseBody { source: reqwest::Error }, + + #[snafu(display("A configuration file was passed in but was not used."))] + UnusedConfigurationFile, + + #[snafu(display("Error creating client: {}", source))] + Client { source: crate::Error }, } pub type Result = std::result::Result; @@ -104,6 +124,15 @@ struct TokenResponse { expires_in: u64, } +#[async_trait] +pub trait TokenProvider: std::fmt::Debug + Send + Sync { + async fn fetch_token( + &self, + client: &Client, + retry: &RetryConfig, + ) -> Result>; +} + /// Encapsulates the logic to perform an OAuth token challenge #[derive(Debug)] pub struct OAuthProvider { @@ -138,9 +167,12 @@ impl OAuthProvider { random: ring::rand::SystemRandom::new(), }) } +} +#[async_trait] +impl TokenProvider for OAuthProvider { /// Fetch a fresh token - pub async fn fetch_token( + async fn fetch_token( &self, client: &Client, retry: &RetryConfig, @@ -195,6 +227,69 @@ impl OAuthProvider { } } +fn read_credentials_file( + service_account_path: impl AsRef, +) -> Result +where + T: serde::de::DeserializeOwned, +{ + let file = File::open(service_account_path).context(OpenCredentialsSnafu)?; + let reader = BufReader::new(file); + serde_json::from_reader(reader).context(DecodeCredentialsSnafu) +} + +/// A deserialized `service-account-********.json`-file. +#[derive(serde::Deserialize, Debug)] +pub struct ServiceAccountCredentials { + /// The private key in RSA format. + pub private_key: String, + + /// The email address associated with the service account. + pub client_email: String, + + /// Base URL for GCS + #[serde(default = "default_gcs_base_url")] + pub gcs_base_url: String, + + /// Disable oauth and use empty tokens. + #[serde(default = "default_disable_oauth")] + pub disable_oauth: bool, +} + +pub fn default_gcs_base_url() -> String { + "https://storage.googleapis.com".to_owned() +} + +pub fn default_disable_oauth() -> bool { + false +} + +impl ServiceAccountCredentials { + /// Create a new [`ServiceAccountCredentials`] from a file. + pub fn from_file>(path: P) -> Result { + read_credentials_file(path) + } + + /// Create a new [`ServiceAccountCredentials`] from a string. + pub fn from_key(key: &str) -> Result { + serde_json::from_str(key).context(DecodeCredentialsSnafu) + } + + /// Create an [`OAuthProvider`] from this credentials struct. + pub fn token_provider( + self, + scope: &str, + audience: &str, + ) -> Result> { + Ok(Box::new(OAuthProvider::new( + self.client_email, + self.private_key, + scope.to_string(), + audience.to_string(), + )?) as Box) + } +} + /// Returns the number of seconds since unix epoch fn seconds_since_epoch() -> u64 { std::time::SystemTime::now() @@ -205,7 +300,7 @@ fn seconds_since_epoch() -> u64 { fn decode_first_rsa_key(private_key_pem: String) -> Result { use rustls_pemfile::Item; - use std::io::{BufReader, Cursor}; + use std::io::Cursor; let mut cursor = Cursor::new(private_key_pem); let mut reader = BufReader::new(&mut cursor); @@ -222,3 +317,159 @@ fn b64_encode_obj(obj: &T) -> Result { let string = serde_json::to_string(obj).context(EncodeSnafu)?; Ok(BASE64_URL_SAFE_NO_PAD.encode(string)) } + +/// A provider that uses the Google Cloud Platform metadata server to fetch a token. +/// +/// +#[derive(Debug, Default)] +pub struct InstanceCredentialProvider { + audience: String, + client: Client, +} + +impl InstanceCredentialProvider { + /// Create a new [`InstanceCredentialProvider`], we need to control the client in order to enable http access so save the options. + pub fn new>( + audience: T, + client_options: ClientOptions, + ) -> Result { + client_options + .with_allow_http(true) + .client() + .map(|client| Self { + audience: audience.into(), + client, + }) + .context(ClientSnafu) + } +} + +/// Make a request to the metadata server to fetch a token, using a a given hostname. +async fn make_metadata_request( + client: &Client, + hostname: &str, + retry: &RetryConfig, + audience: &str, +) -> Result { + let url = format!( + "http://{}/computeMetadata/v1/instance/service-accounts/default/token", + hostname + ); + let response: TokenResponse = client + .request(Method::GET, url) + .header("Metadata-Flavor", "Google") + .query(&[("audience", audience)]) + .send_retry(retry) + .await + .context(TokenRequestSnafu)? + .json() + .await + .context(TokenResponseBodySnafu)?; + Ok(response) +} + +#[async_trait] +impl TokenProvider for InstanceCredentialProvider { + /// Fetch a token from the metadata server. + /// Since the connection is local we need to enable http access and don't actually use the client object passed in. + async fn fetch_token( + &self, + _client: &Client, + retry: &RetryConfig, + ) -> Result> { + const METADATA_IP: &str = "169.254.169.254"; + const METADATA_HOST: &str = "metadata"; + + info!("fetching token from metadata server"); + let response = + make_metadata_request(&self.client, METADATA_HOST, retry, &self.audience) + .or_else(|_| { + make_metadata_request( + &self.client, + METADATA_IP, + retry, + &self.audience, + ) + }) + .await?; + let token = TemporaryToken { + token: response.access_token, + expiry: Instant::now() + Duration::from_secs(response.expires_in), + }; + Ok(token) + } +} + +/// A deserialized `application_default_credentials.json`-file. +/// +#[derive(serde::Deserialize, Debug)] +pub struct ApplicationDefaultCredentials { + client_id: String, + client_secret: String, + refresh_token: String, + #[serde(rename = "type")] + type_: String, +} + +impl ApplicationDefaultCredentials { + const DEFAULT_TOKEN_GCP_URI: &'static str = + "https://accounts.google.com/o/oauth2/token"; + const CREDENTIALS_PATH: &'static str = + ".config/gcloud/application_default_credentials.json"; + const EXPECTED_TYPE: &str = "authorized_user"; + + // Create a new application default credential in the following situations: + // 1. a file is passed in and the type matches. + // 2. without argument if the well-known configuration file is present. + pub fn new(path: Option<&str>) -> Result, Error> { + if let Some(path) = path { + if let Ok(credentials) = read_credentials_file::(path) { + if credentials.type_ == Self::EXPECTED_TYPE { + return Ok(Some(credentials)); + } + } + // Return an error if the path has not been used. + return Err(Error::UnusedConfigurationFile); + } + if let Some(home) = env::var_os("HOME") { + let path = Path::new(&home).join(Self::CREDENTIALS_PATH); + + // It's expected for this file to not exist unless it has been explicitly configured by the user. + if path.try_exists().unwrap_or(false) { + return read_credentials_file::(path).map(Some); + } + } + Ok(None) + } +} + +#[async_trait] +impl TokenProvider for ApplicationDefaultCredentials { + async fn fetch_token( + &self, + client: &Client, + retry: &RetryConfig, + ) -> Result, Error> { + let body = [ + ("grant_type", "refresh_token"), + ("client_id", &self.client_id), + ("client_secret", &self.client_secret), + ("refresh_token", &self.refresh_token), + ]; + + let response = client + .request(Method::POST, Self::DEFAULT_TOKEN_GCP_URI) + .form(&body) + .send_retry(retry) + .await + .context(TokenRequestSnafu)? + .json::() + .await + .context(TokenResponseBodySnafu)?; + let token = TemporaryToken { + token: response.access_token, + expiry: Instant::now() + Duration::from_secs(response.expires_in), + }; + Ok(token) + } +} diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 28972c4a6636..871413b43801 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -30,8 +30,7 @@ //! consider implementing automatic clean up of unused parts that are older than one //! week. use std::collections::BTreeSet; -use std::fs::File; -use std::io::{self, BufReader}; +use std::io; use std::ops::Range; use std::str::FromStr; use std::sync::Arc; @@ -59,18 +58,15 @@ use crate::{ RetryConfig, }; -use credential::OAuthProvider; +use self::credential::{ + default_gcs_base_url, ApplicationDefaultCredentials, InstanceCredentialProvider, + ServiceAccountCredentials, TokenProvider, +}; mod credential; #[derive(Debug, Snafu)] enum Error { - #[snafu(display("Unable to open service account file: {}", source))] - OpenCredentials { source: std::io::Error }, - - #[snafu(display("Unable to decode service account file: {}", source))] - DecodeCredentials { source: serde_json::Error }, - #[snafu(display("Got invalid XML response for {} {}: {}", method, url, source))] InvalidXMLResponse { source: quick_xml::de::DeError, @@ -121,8 +117,8 @@ enum Error { #[snafu(display("Missing bucket name"))] MissingBucketName {}, - #[snafu(display("Missing service account path or key"))] - MissingServiceAccountPathOrKey, + #[snafu(display("Could not find either metadata credentials or configuration properties to initialize GCS credentials."))] + MissingCredentials, #[snafu(display( "One of service account path or service account key may be provided." @@ -185,32 +181,6 @@ impl From for super::Error { } } -/// A deserialized `service-account-********.json`-file. -#[derive(serde::Deserialize, Debug)] -struct ServiceAccountCredentials { - /// The private key in RSA format. - pub private_key: String, - - /// The email address associated with the service account. - pub client_email: String, - - /// Base URL for GCS - #[serde(default = "default_gcs_base_url")] - pub gcs_base_url: String, - - /// Disable oauth and use empty tokens. - #[serde(default = "default_disable_oauth")] - pub disable_oauth: bool, -} - -fn default_gcs_base_url() -> String { - "https://storage.googleapis.com".to_owned() -} - -fn default_disable_oauth() -> bool { - false -} - #[derive(serde::Deserialize, Debug)] #[serde(rename_all = "camelCase")] struct ListResponse { @@ -267,7 +237,7 @@ struct GoogleCloudStorageClient { client: Client, base_url: String, - oauth_provider: Option, + token_provider: Option>>, token_cache: TokenCache, bucket_name: String, @@ -282,11 +252,11 @@ struct GoogleCloudStorageClient { impl GoogleCloudStorageClient { async fn get_token(&self) -> Result { - if let Some(oauth_provider) = &self.oauth_provider { + if let Some(token_provider) = &self.token_provider { Ok(self .token_cache .get_or_insert_with(|| { - oauth_provider.fetch_token(&self.client, &self.retry_config) + token_provider.fetch_token(&self.client, &self.retry_config) }) .await .context(CredentialSnafu)?) @@ -779,14 +749,6 @@ impl ObjectStore for GoogleCloudStorage { } } -fn reader_credentials_file( - service_account_path: impl AsRef, -) -> Result { - let file = File::open(service_account_path).context(OpenCredentialsSnafu)?; - let reader = BufReader::new(file); - Ok(serde_json::from_reader(reader).context(DecodeCredentialsSnafu)?) -} - /// Configure a connection to Google Cloud Storage using the specified /// credentials. /// @@ -806,6 +768,7 @@ pub struct GoogleCloudStorageBuilder { url: Option, service_account_path: Option, service_account_key: Option, + application_credentials_path: Option, retry_config: RetryConfig, client_options: ClientOptions, } @@ -862,6 +825,11 @@ pub enum GoogleConfigKey { /// - `bucket` /// - `bucket_name` Bucket, + + /// Application credentials path + /// + /// See [`GoogleCloudStorageBuilder::with_application_credentials`]. + ApplicationCredentials, } impl AsRef for GoogleConfigKey { @@ -870,6 +838,7 @@ impl AsRef for GoogleConfigKey { Self::ServiceAccount => "google_service_account", Self::ServiceAccountKey => "google_service_account_key", Self::Bucket => "google_bucket", + Self::ApplicationCredentials => "google_application_credentials", } } } @@ -889,6 +858,7 @@ impl FromStr for GoogleConfigKey { "google_bucket" | "google_bucket_name" | "bucket" | "bucket_name" => { Ok(Self::Bucket) } + "google_application_credentials" => Ok(Self::ApplicationCredentials), _ => Err(Error::UnknownConfigurationKey { key: s.into() }.into()), } } @@ -900,6 +870,7 @@ impl Default for GoogleCloudStorageBuilder { bucket_name: None, service_account_path: None, service_account_key: None, + application_credentials_path: None, retry_config: Default::default(), client_options: ClientOptions::new().with_allow_http(true), url: None, @@ -988,6 +959,9 @@ impl GoogleCloudStorageBuilder { self.service_account_key = Some(value.into()) } GoogleConfigKey::Bucket => self.bucket_name = Some(value.into()), + GoogleConfigKey::ApplicationCredentials => { + self.application_credentials_path = Some(value.into()) + } }; Ok(self) } @@ -1069,6 +1043,17 @@ impl GoogleCloudStorageBuilder { self } + /// Set the path to the application credentials file. + /// + /// + pub fn with_application_credentials( + mut self, + application_credentials_path: impl Into, + ) -> Self { + self.application_credentials_path = Some(application_credentials_path.into()); + self + } + /// Set the retry configuration pub fn with_retry(mut self, retry_config: RetryConfig) -> Self { self.retry_config = retry_config; @@ -1098,44 +1083,75 @@ impl GoogleCloudStorageBuilder { let client = self.client_options.client()?; - let credentials = match (self.service_account_path, self.service_account_key) { - (Some(path), None) => reader_credentials_file(path)?, - (None, Some(key)) => { - serde_json::from_str(&key).context(DecodeCredentialsSnafu)? - } - (None, None) => return Err(Error::MissingServiceAccountPathOrKey.into()), - (Some(_), Some(_)) => { - return Err(Error::ServiceAccountPathAndKeyProvided.into()) - } - }; + // First try to initialize from the service account information. + let service_account_credentials = + match (self.service_account_path, self.service_account_key) { + (Some(path), None) => Some( + ServiceAccountCredentials::from_file(path) + .context(CredentialSnafu)?, + ), + (None, Some(key)) => Some( + ServiceAccountCredentials::from_key(&key).context(CredentialSnafu)?, + ), + (None, None) => None, + (Some(_), Some(_)) => { + return Err(Error::ServiceAccountPathAndKeyProvided.into()) + } + }; + + // Then try to initialize from the application credentials file, or the environment. + let application_default_credentials = ApplicationDefaultCredentials::new( + self.application_credentials_path.as_deref(), + ) + .context(CredentialSnafu)?; + + let disable_oauth = service_account_credentials + .as_ref() + .map(|c| c.disable_oauth) + .unwrap_or(false); + + let gcs_base_url = service_account_credentials + .as_ref() + .map(|c| c.gcs_base_url.clone()) + .unwrap_or_else(default_gcs_base_url); // TODO: https://cloud.google.com/storage/docs/authentication#oauth-scopes let scope = "https://www.googleapis.com/auth/devstorage.full_control"; - let audience = "https://www.googleapis.com/oauth2/v4/token".to_string(); - - let oauth_provider = (!credentials.disable_oauth) - .then(|| { - OAuthProvider::new( - credentials.client_email, - credentials.private_key, - scope.to_string(), - audience, + let audience = "https://www.googleapis.com/oauth2/v4/token"; + + let token_provider = if disable_oauth { + None + } else { + let best_provider = if let Some(credentials) = service_account_credentials { + Some( + credentials + .token_provider(scope, audience) + .context(CredentialSnafu)?, ) - }) - .transpose() - .context(CredentialSnafu)?; + } else if let Some(credentials) = application_default_credentials { + Some(Box::new(credentials) as Box) + } else { + Some(Box::new( + InstanceCredentialProvider::new( + audience, + self.client_options.clone(), + ) + .context(CredentialSnafu)?, + ) as Box) + }; + + // A provider is required at this point, bail out if we don't have one. + Some(best_provider.ok_or(Error::MissingCredentials)?) + }; let encoded_bucket_name = percent_encode(bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string(); - // The cloud storage crate currently only supports authentication via - // environment variables. Set the environment variable explicitly so - // that we can optionally accept command line arguments instead. Ok(GoogleCloudStorage { client: Arc::new(GoogleCloudStorageClient { client, - base_url: credentials.gcs_base_url, - oauth_provider, + base_url: gcs_base_url, + token_provider: token_provider.map(Arc::new), token_cache: Default::default(), bucket_name, bucket_name_encoded: encoded_bucket_name,