Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Serializable;
import java.util.Map;
import org.apache.iceberg.aws.s3.VendedCredentialsProvider;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -66,21 +67,37 @@ public class AwsClientProperties implements Serializable {
*/
public static final String CLIENT_REGION = "client.region";

/**
* When set, the {@link VendedCredentialsProvider} will be used to fetch and refresh vended
* credentials from this endpoint.
*/
public static final String REFRESH_CREDENTIALS_ENDPOINT = "client.refresh-credentials-endpoint";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be a s3 FileIO property?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property needs to be set before FileIO is actually being configured and is similar to the client region. VendedCredentialsProvider is configured when AwsClientProperties#credentialsProvider(..) is called, so I don't think this property should be a FileIO property


/** Controls whether vended credentials should be refreshed or not. Defaults to true. */
public static final String REFRESH_CREDENTIALS_ENABLED = "client.refresh-credentials-enabled";

private String clientRegion;
private final String clientCredentialsProvider;
private final Map<String, String> clientCredentialsProviderProperties;
private final String refreshCredentialsEndpoint;
private final boolean refreshCredentialsEnabled;

public AwsClientProperties() {
this.clientRegion = null;
this.clientCredentialsProvider = null;
this.clientCredentialsProviderProperties = null;
this.refreshCredentialsEndpoint = null;
this.refreshCredentialsEnabled = true;
}

public AwsClientProperties(Map<String, String> properties) {
this.clientRegion = properties.get(CLIENT_REGION);
this.clientCredentialsProvider = properties.get(CLIENT_CREDENTIALS_PROVIDER);
this.clientCredentialsProviderProperties =
PropertyUtil.propertiesWithPrefix(properties, CLIENT_CREDENTIAL_PROVIDER_PREFIX);
this.refreshCredentialsEndpoint = properties.get(REFRESH_CREDENTIALS_ENDPOINT);
this.refreshCredentialsEnabled =
PropertyUtil.propertyAsBoolean(properties, REFRESH_CREDENTIALS_ENABLED, true);
}

public String clientRegion() {
Expand Down Expand Up @@ -122,11 +139,12 @@ public <T extends AwsClientBuilder> void applyClientCredentialConfigurations(T b
}

/**
* Returns a credentials provider instance. If params were set, we return a new credentials
* instance. If none of the params are set, we try to dynamically load the provided credentials
* provider class. Upon loading the class, we try to invoke {@code create(Map<String, String>)}
* static method. If that fails, we fall back to {@code create()}. If credential provider class
* wasn't set, we fall back to default credentials provider.
* Returns a credentials provider instance. If {@link #refreshCredentialsEndpoint} is set, an
* instance of {@link VendedCredentialsProvider} is returned. If params were set, we return a new
* credentials instance. If none of the params are set, we try to dynamically load the provided
* credentials provider class. Upon loading the class, we try to invoke {@code create(Map<String,
* String>)} static method. If that fails, we fall back to {@code create()}. If credential
* provider class wasn't set, we fall back to default credentials provider.
*
* @param accessKeyId the AWS access key ID
* @param secretAccessKey the AWS secret access key
Expand All @@ -136,6 +154,12 @@ public <T extends AwsClientBuilder> void applyClientCredentialConfigurations(T b
@SuppressWarnings("checkstyle:HiddenField")
public AwsCredentialsProvider credentialsProvider(
String accessKeyId, String secretAccessKey, String sessionToken) {
if (refreshCredentialsEnabled && !Strings.isNullOrEmpty(refreshCredentialsEndpoint)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we log a warning if the endpoint is set but refreshCredentialsEnabled is false? I don't think we should fail but this is probably something a user would want to be aware of.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure adding a warning adds a lot of value. It's valid to have the server send you back an endpoint + refresh enabled flag that you then override for cases like Kafka connect. I'd say let's go without a warning for now unless this becomes a place of confusion

clientCredentialsProviderProperties.put(
VendedCredentialsProvider.URI, refreshCredentialsEndpoint);
return credentialsProvider(VendedCredentialsProvider.class.getName());
}

if (!Strings.isNullOrEmpty(accessKeyId) && !Strings.isNullOrEmpty(secretAccessKey)) {
if (Strings.isNullOrEmpty(sessionToken)) {
return StaticCredentialsProvider.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ public class S3FileIOProperties implements Serializable {
*/
public static final String SESSION_TOKEN = "s3.session-token";

/**
* Configure the expiration time in millis of the static session token used to access S3FileIO.
* This expiration time is currently only used in {@link VendedCredentialsProvider} for refreshing
* vended credentials.
*/
static final String SESSION_TOKEN_EXPIRES_AT_MS = "s3.session-token-expires-at-ms";

/**
* Enable to make S3FileIO, to make cross-region call to the region specified in the ARN of an
* access point.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws.s3;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.awssdk.utils.SdkAutoCloseable;
import software.amazon.awssdk.utils.cache.CachedSupplier;
import software.amazon.awssdk.utils.cache.RefreshResult;

public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAutoCloseable {
Copy link
Contributor Author

@nastra nastra Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackye1995 is this credential provider similar to the one you mentioned a while ago where you guys had a custom credential provider that would always call loadTable()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this looks similar.

public static final String URI = "credentials.uri";
private volatile HTTPClient client;
private final Map<String, String> properties;
private final CachedSupplier<AwsCredentials> credentialCache;

private VendedCredentialsProvider(Map<String, String> properties) {
Preconditions.checkArgument(null != properties, "Invalid properties: null");
Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null");
this.properties = properties;
this.credentialCache =
CachedSupplier.builder(this::refreshCredential)
.cachedValueName(VendedCredentialsProvider.class.getName())
.build();
}

@Override
public AwsCredentials resolveCredentials() {
return credentialCache.get();
}

@Override
public void close() {
IoUtils.closeQuietly(client, null);
credentialCache.close();
}

public static VendedCredentialsProvider create(Map<String, String> properties) {
return new VendedCredentialsProvider(properties);
}

private RESTClient httpClient() {
if (null == client) {
synchronized (this) {
if (null == client) {
client = HTTPClient.builder(properties).uri(properties.get(URI)).build();
}
}
}

return client;
}

private LoadCredentialsResponse fetchCredentials() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are 2 concepts that would be beneficial to be added, a staleTime and a prefetchTime. You could check the AWS SDK StsCredentialsProvider for how that is implemented. But this prevents edge cases like the credentials is loaded at almost the expiration time and cause errors downstream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already being done further below:

return RefreshResult.builder(
            (AwsCredentials)
                AwsSessionCredentials.builder()
                    .accessKeyId(accessKeyId)
                    .secretAccessKey(secretAccessKey)
                    .sessionToken(sessionToken)
                    .expirationTime(expiresAt)
                    .build())
        .staleTime(expiresAt)
        .prefetchTime(prefetchAt)
        .build();

return httpClient()
.get(
properties.get(URI),
null,
LoadCredentialsResponse.class,
OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN)),
Comment on lines +90 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the Token here is same short lived token used by RestCatalog instance, are we planning to handle token refresh post its expiration with in the FileIO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is indeed an issue that I was planning to address when I wrote the first version of this many months ago but simply forgot to get back to. thanks for raising this @ChaladiMohanVamsi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for the initial support for refresh, we can assume that we're bound by the token lifetime for refreshes. It's not perfect, but I'd like to see the resolution of some of the AuthManger refactor before settling on a solution. We're not regressing at this point, so I'm ok with leaving this as is and addressing in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, this is on my todo list for after the AuthManager refactor.

ErrorHandlers.defaultErrorHandler());
}

private RefreshResult<AwsCredentials> refreshCredential() {
LoadCredentialsResponse response = fetchCredentials();

List<Credential> s3Credentials =
response.credentials().stream()
.filter(c -> c.prefix().startsWith("s3"))
.collect(Collectors.toList());
Comment on lines +98 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This imho still doesn't addresses the problem of wiring the credential for the right prefix ?
for ex : S3 rest server returned prefix as "s3://bucket/prefix-1"
but the call was for "s3://bucket/prefix-2", unless there is an enforcement from REST "that it will return only longest common prefix in the response"

I would recommend rather than using starting with "S3" let make it equal to "S3" for now to make sure the client doesn't mess around, what do you think ?

@danielcweeks @RussellSpitzer @amogh-jahagirdar @nastra

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singhpk234 for how the implementation enforces that there's really only a single credential being sent back by the server. I'll be working on supporting and selecting the "right" credential when the server sents back multiple in a follow-up

Copy link
Contributor

@singhpk234 singhpk234 Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for how the implementation enforces that there's really only a single credential being sent back by the server

[doubt] how are we enforcing this ? for ex a rest server can send a credential for only one prefix but still for a diff prefix than what being asked for, are you suggesting that for now, rest server should only send back for exactly "s3" prefix ? if yes how are we enforcing this in rest for the meanwhile ?
Is strategy for now to let if fail ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catalog has the responsibility of returning a credential with a scoped policy that provides the appropriate access for all prefixes. This isn't about the spec or client trying to enforce that behavior. Either the client will have access or not, that's up to the catalog.

Copy link
Contributor

@singhpk234 singhpk234 Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catalog has the responsibility of returning a credential with a scoped policy that provides the appropriate access for all prefixes

I see, so that fact that only one credential is being returned from the catalog, it itself means that its best prefix that could fit in into all the request the client is allowed make. Make sense ! so we would indeed fail at client trying for an un-acessible prefix from S3 end.


Preconditions.checkState(!s3Credentials.isEmpty(), "Invalid S3 Credentials: empty");
Preconditions.checkState(
s3Credentials.size() == 1, "Invalid S3 Credentials: only one S3 credential should exist");

Credential s3Credential = s3Credentials.get(0);
checkCredential(s3Credential, S3FileIOProperties.ACCESS_KEY_ID);
checkCredential(s3Credential, S3FileIOProperties.SECRET_ACCESS_KEY);
checkCredential(s3Credential, S3FileIOProperties.SESSION_TOKEN);
checkCredential(s3Credential, S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS);

String accessKeyId = s3Credential.config().get(S3FileIOProperties.ACCESS_KEY_ID);
String secretAccessKey = s3Credential.config().get(S3FileIOProperties.SECRET_ACCESS_KEY);
String sessionToken = s3Credential.config().get(S3FileIOProperties.SESSION_TOKEN);
String tokenExpiresAtMillis =
s3Credential.config().get(S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS);
Instant expiresAt = Instant.ofEpochMilli(Long.parseLong(tokenExpiresAtMillis));
Instant prefetchAt = expiresAt.minus(5, ChronoUnit.MINUTES);

return RefreshResult.builder(
(AwsCredentials)
AwsSessionCredentials.builder()
.accessKeyId(accessKeyId)
.secretAccessKey(secretAccessKey)
.sessionToken(sessionToken)
.expirationTime(expiresAt)
.build())
.staleTime(expiresAt)
.prefetchTime(prefetchAt)
.build();
}

private void checkCredential(Credential credential, String property) {
Preconditions.checkState(
credential.config().containsKey(property), "Invalid S3 Credentials: %s not set", property);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import org.apache.iceberg.aws.s3.VendedCredentialsProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand All @@ -29,6 +31,7 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3ClientBuilder;

Expand Down Expand Up @@ -111,4 +114,30 @@ public void testSessionCredentialsConfiguration() {
.as("The secret access key should be the same as the one set by tag SECRET_ACCESS_KEY")
.isEqualTo("secret");
}

@Test
public void refreshCredentialsEndpoint() {
AwsClientProperties awsClientProperties =
new AwsClientProperties(
ImmutableMap.of(
AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT,
"http://localhost:1234/v1/credentials"));

assertThat(awsClientProperties.credentialsProvider("key", "secret", "token"))
.isInstanceOf(VendedCredentialsProvider.class);
}

@Test
public void refreshCredentialsEndpointSetButRefreshDisabled() {
AwsClientProperties awsClientProperties =
new AwsClientProperties(
ImmutableMap.of(
AwsClientProperties.REFRESH_CREDENTIALS_ENABLED,
"false",
AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT,
"http://localhost:1234/v1/credentials"));

assertThat(awsClientProperties.credentialsProvider("key", "secret", "token"))
.isInstanceOf(StaticCredentialsProvider.class);
}
}
Loading