Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional GCP authentication #3541

Merged
merged 4 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 253 additions & 2 deletions object_store/src/gcp/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,30 @@

use crate::client::retry::RetryExt;
use crate::client::token::TemporaryToken;
use crate::ClientOptions;
use crate::RetryConfig;
use async_trait::async_trait;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use base64::Engine;
use futures::TryFutureExt;
use reqwest::{Client, Method};
use ring::signature::RsaKeyPair;
use snafu::{ResultExt, Snafu};
use std::env;
use std::fs::File;
use std::io::BufReader;
use std::path::Path;
use std::time::{Duration, Instant};
use tracing::info;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unable to open service account file: {}", source))]
OpenCredentials { source: std::io::Error },

#[snafu(display("Unable to decode service account file: {}", source))]
DecodeCredentials { source: serde_json::Error },

#[snafu(display("No RSA key found in pem file"))]
MissingKey,

Expand All @@ -47,6 +61,12 @@ pub enum Error {

#[snafu(display("Error getting token response body: {}", source))]
TokenResponseBody { source: reqwest::Error },

#[snafu(display("A configuration file was passed in but was not used."))]
UnusedConfigurationFile,

#[snafu(display("Error creating client: {}", source))]
Client { source: crate::Error },
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -104,6 +124,15 @@ struct TokenResponse {
expires_in: u64,
}

#[async_trait]
pub trait TokenProvider: std::fmt::Debug + Send + Sync {
async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
) -> Result<TemporaryToken<String>>;
}

/// Encapsulates the logic to perform an OAuth token challenge
#[derive(Debug)]
pub struct OAuthProvider {
Expand Down Expand Up @@ -138,9 +167,12 @@ impl OAuthProvider {
random: ring::rand::SystemRandom::new(),
})
}
}

#[async_trait]
impl TokenProvider for OAuthProvider {
/// Fetch a fresh token
pub async fn fetch_token(
async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
Expand Down Expand Up @@ -195,6 +227,69 @@ impl OAuthProvider {
}
}

fn read_credentials_file<T>(
service_account_path: impl AsRef<std::path::Path>,
) -> Result<T>
where
T: serde::de::DeserializeOwned,
{
let file = File::open(service_account_path).context(OpenCredentialsSnafu)?;
let reader = BufReader::new(file);
serde_json::from_reader(reader).context(DecodeCredentialsSnafu)
}

/// A deserialized `service-account-********.json`-file.
#[derive(serde::Deserialize, Debug)]
pub struct ServiceAccountCredentials {
/// The private key in RSA format.
pub private_key: String,

/// The email address associated with the service account.
pub client_email: String,

/// Base URL for GCS
#[serde(default = "default_gcs_base_url")]
pub gcs_base_url: String,

/// Disable oauth and use empty tokens.
#[serde(default = "default_disable_oauth")]
pub disable_oauth: bool,
}

pub fn default_gcs_base_url() -> String {
"https://storage.googleapis.com".to_owned()
}

pub fn default_disable_oauth() -> bool {
false
}

impl ServiceAccountCredentials {
/// Create a new [`ServiceAccountCredentials`] from a file.
pub fn from_file<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
read_credentials_file(path)
}

/// Create a new [`ServiceAccountCredentials`] from a string.
pub fn from_key(key: &str) -> Result<Self> {
serde_json::from_str(key).context(DecodeCredentialsSnafu)
}

/// Create an [`OAuthProvider`] from this credentials struct.
pub fn token_provider(
self,
scope: &str,
audience: &str,
) -> Result<Box<dyn TokenProvider>> {
Ok(Box::new(OAuthProvider::new(
self.client_email,
self.private_key,
scope.to_string(),
audience.to_string(),
)?) as Box<dyn TokenProvider>)
}
}

/// Returns the number of seconds since unix epoch
fn seconds_since_epoch() -> u64 {
std::time::SystemTime::now()
Expand All @@ -205,7 +300,7 @@ fn seconds_since_epoch() -> u64 {

fn decode_first_rsa_key(private_key_pem: String) -> Result<RsaKeyPair> {
use rustls_pemfile::Item;
use std::io::{BufReader, Cursor};
use std::io::Cursor;

let mut cursor = Cursor::new(private_key_pem);
let mut reader = BufReader::new(&mut cursor);
Expand All @@ -222,3 +317,159 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> Result<String> {
let string = serde_json::to_string(obj).context(EncodeSnafu)?;
Ok(BASE64_URL_SAFE_NO_PAD.encode(string))
}

/// A provider that uses the Google Cloud Platform metadata server to fetch a token.
///
/// <https://cloud.google.com/docs/authentication/get-id-token#metadata-server>
#[derive(Debug, Default)]
pub struct InstanceCredentialProvider {
audience: String,
client: Client,
}

impl InstanceCredentialProvider {
/// Create a new [`InstanceCredentialProvider`], we need to control the client in order to enable http access so save the options.
pub fn new<T: Into<String>>(
audience: T,
client_options: ClientOptions,
) -> Result<Self> {
client_options
.with_allow_http(true)
.client()
.map(|client| Self {
audience: audience.into(),
client,
})
.context(ClientSnafu)
}
}

/// Make a request to the metadata server to fetch a token, using a a given hostname.
async fn make_metadata_request(
client: &Client,
hostname: &str,
retry: &RetryConfig,
audience: &str,
) -> Result<TokenResponse> {
let url = format!(
"http://{}/computeMetadata/v1/instance/service-accounts/default/token",
hostname
);
let response: TokenResponse = client
.request(Method::GET, url)
.header("Metadata-Flavor", "Google")
.query(&[("audience", audience)])
.send_retry(retry)
.await
.context(TokenRequestSnafu)?
.json()
.await
.context(TokenResponseBodySnafu)?;
Ok(response)
}

#[async_trait]
impl TokenProvider for InstanceCredentialProvider {
/// Fetch a token from the metadata server.
/// Since the connection is local we need to enable http access and don't actually use the client object passed in.
async fn fetch_token(
&self,
_client: &Client,
retry: &RetryConfig,
) -> Result<TemporaryToken<String>> {
const METADATA_IP: &str = "169.254.169.254";
const METADATA_HOST: &str = "metadata";

info!("fetching token from metadata server");
let response =
make_metadata_request(&self.client, METADATA_HOST, retry, &self.audience)
.or_else(|_| {
make_metadata_request(
&self.client,
METADATA_IP,
retry,
&self.audience,
)
})
.await?;
let token = TemporaryToken {
token: response.access_token,
expiry: Instant::now() + Duration::from_secs(response.expires_in),
};
Ok(token)
}
}

/// A deserialized `application_default_credentials.json`-file.
/// <https://cloud.google.com/docs/authentication/application-default-credentials#personal>
#[derive(serde::Deserialize, Debug)]
pub struct ApplicationDefaultCredentials {
client_id: String,
client_secret: String,
refresh_token: String,
#[serde(rename = "type")]
type_: String,
}

impl ApplicationDefaultCredentials {
const DEFAULT_TOKEN_GCP_URI: &'static str =
"https://accounts.google.com/o/oauth2/token";
const CREDENTIALS_PATH: &'static str =
".config/gcloud/application_default_credentials.json";
const EXPECTED_TYPE: &str = "authorized_user";

// Create a new application default credential in the following situations:
// 1. a file is passed in and the type matches.
// 2. without argument if the well-known configuration file is present.
pub fn new(path: Option<&str>) -> Result<Option<Self>, Error> {
if let Some(path) = path {
if let Ok(credentials) = read_credentials_file::<Self>(path) {
if credentials.type_ == Self::EXPECTED_TYPE {
return Ok(Some(credentials));
}
}
// Return an error if the path has not been used.
return Err(Error::UnusedConfigurationFile);
}
if let Some(home) = env::var_os("HOME") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is potentially fragile, but adding an additional dependency is probably not worth it so LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dirs look like a good option but you mentioned wanting to keep the dependencies lean :)

let path = Path::new(&home).join(Self::CREDENTIALS_PATH);

// It's expected for this file to not exist unless it has been explicitly configured by the user.
if path.try_exists().unwrap_or(false) {
return read_credentials_file::<Self>(path).map(Some);
}
}
Ok(None)
}
}

#[async_trait]
impl TokenProvider for ApplicationDefaultCredentials {
async fn fetch_token(
&self,
client: &Client,
retry: &RetryConfig,
) -> Result<TemporaryToken<String>, Error> {
let body = [
("grant_type", "refresh_token"),
("client_id", &self.client_id),
("client_secret", &self.client_secret),
("refresh_token", &self.refresh_token),
];

let response = client
.request(Method::POST, Self::DEFAULT_TOKEN_GCP_URI)
.form(&body)
.send_retry(retry)
.await
.context(TokenRequestSnafu)?
.json::<TokenResponse>()
.await
.context(TokenResponseBodySnafu)?;
let token = TemporaryToken {
token: response.access_token,
expiry: Instant::now() + Duration::from_secs(response.expires_in),
};
Ok(token)
}
}
Loading