From 4f5de90bbba9abdf36f94b8821bf57bc51fe96a6 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Fri, 18 Oct 2024 17:09:56 +0200 Subject: [PATCH 1/4] AWS: Refresh vended credentials --- .../iceberg/aws/AwsClientProperties.java | 36 +- .../iceberg/aws/s3/S3FileIOProperties.java | 7 + .../aws/s3/VendedCredentialsProvider.java | 141 ++++++++ .../iceberg/aws/AwsClientPropertiesTest.java | 29 ++ .../aws/s3/TestVendedCredentialsProvider.java | 323 ++++++++++++++++++ 5 files changed, 531 insertions(+), 5 deletions(-) create mode 100644 aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java create mode 100644 aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java index 0c91f8685ae9..0d659a3fa342 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.Map; +import org.apache.iceberg.aws.s3.VendedCredentialsProvider; import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -66,14 +67,29 @@ public class AwsClientProperties implements Serializable { */ public static final String CLIENT_REGION = "client.region"; + /** + * Configure the endpoint to be used to fetch and refresh vended credentials. + * + *

When set, the {@link VendedCredentialsProvider} will be used to fetch and refresh vended + * credentials. + */ + public static final String REFRESH_CREDENTIALS_ENDPOINT = "client.refresh-credentials-endpoint"; + + /** Controls whether vended credentials should be refreshed or not. Defaults to true. */ + public static final String REFRESH_CREDENTIALS_ENABLED = "client.refresh-credentials-enabled"; + private String clientRegion; private final String clientCredentialsProvider; private final Map clientCredentialsProviderProperties; + private final String refreshCredentialsEndpoint; + private final boolean refreshCredentialsEnabled; public AwsClientProperties() { this.clientRegion = null; this.clientCredentialsProvider = null; this.clientCredentialsProviderProperties = null; + this.refreshCredentialsEndpoint = null; + this.refreshCredentialsEnabled = true; } public AwsClientProperties(Map properties) { @@ -81,6 +97,9 @@ public AwsClientProperties(Map properties) { this.clientCredentialsProvider = properties.get(CLIENT_CREDENTIALS_PROVIDER); this.clientCredentialsProviderProperties = PropertyUtil.propertiesWithPrefix(properties, CLIENT_CREDENTIAL_PROVIDER_PREFIX); + this.refreshCredentialsEndpoint = properties.get(REFRESH_CREDENTIALS_ENDPOINT); + this.refreshCredentialsEnabled = + PropertyUtil.propertyAsBoolean(properties, REFRESH_CREDENTIALS_ENABLED, true); } public String clientRegion() { @@ -122,11 +141,12 @@ public void applyClientCredentialConfigurations(T b } /** - * Returns a credentials provider instance. If params were set, we return a new credentials - * instance. If none of the params are set, we try to dynamically load the provided credentials - * provider class. Upon loading the class, we try to invoke {@code create(Map)} - * static method. If that fails, we fall back to {@code create()}. If credential provider class - * wasn't set, we fall back to default credentials provider. + * Returns a credentials provider instance. If {@link #refreshCredentialsEndpoint} is set, an + * instance of {@link VendedCredentialsProvider} is returned. If params were set, we return a new + * credentials instance. If none of the params are set, we try to dynamically load the provided + * credentials provider class. Upon loading the class, we try to invoke {@code create(Map)} static method. If that fails, we fall back to {@code create()}. If credential + * provider class wasn't set, we fall back to default credentials provider. * * @param accessKeyId the AWS access key ID * @param secretAccessKey the AWS secret access key @@ -136,6 +156,12 @@ public void applyClientCredentialConfigurations(T b @SuppressWarnings("checkstyle:HiddenField") public AwsCredentialsProvider credentialsProvider( String accessKeyId, String secretAccessKey, String sessionToken) { + if (refreshCredentialsEnabled && !Strings.isNullOrEmpty(refreshCredentialsEndpoint)) { + clientCredentialsProviderProperties.put( + VendedCredentialsProvider.URI, refreshCredentialsEndpoint); + return credentialsProvider(VendedCredentialsProvider.class.getName()); + } + if (!Strings.isNullOrEmpty(accessKeyId) && !Strings.isNullOrEmpty(secretAccessKey)) { if (Strings.isNullOrEmpty(sessionToken)) { return StaticCredentialsProvider.create( diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java index 5da758704ae5..8d97b9d1bf20 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java @@ -225,6 +225,13 @@ public class S3FileIOProperties implements Serializable { */ public static final String SESSION_TOKEN = "s3.session-token"; + /** + * Configure the expiration time in millis of the static session token used to access S3FileIO. + * This expiration time is currently only used in {@link VendedCredentialsProvider} for refreshing + * vended credentials. + */ + static final String SESSION_TOKEN_EXPIRES_AT_MS = "s3.session-token-expires-at-ms"; + /** * Enable to make S3FileIO, to make cross-region call to the region specified in the ARN of an * access point. diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java new file mode 100644 index 000000000000..cf667284d813 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.ErrorHandlers; +import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.auth.OAuth2Properties; +import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.rest.credentials.Credential; +import org.apache.iceberg.rest.responses.LoadCredentialsResponse; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.utils.IoUtils; +import software.amazon.awssdk.utils.SdkAutoCloseable; +import software.amazon.awssdk.utils.cache.CachedSupplier; +import software.amazon.awssdk.utils.cache.RefreshResult; + +public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAutoCloseable { + public static final String URI = "credentials.uri"; + private volatile HTTPClient client; + private final Map properties; + private final CachedSupplier credentialCache; + + private VendedCredentialsProvider(Map properties) { + Preconditions.checkArgument(null != properties, "Invalid properties: null"); + Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null"); + this.properties = properties; + this.credentialCache = + CachedSupplier.builder(this::refreshCredential) + .cachedValueName(VendedCredentialsProvider.class.getName()) + .build(); + } + + @Override + public AwsCredentials resolveCredentials() { + return credentialCache.get(); + } + + @Override + public void close() { + IoUtils.closeQuietly(client, null); + credentialCache.close(); + } + + public static VendedCredentialsProvider create(Map properties) { + return new VendedCredentialsProvider(properties); + } + + private RESTClient httpClient() { + if (null == client) { + synchronized (this) { + if (null == client) { + client = HTTPClient.builder(properties).uri(properties.get(URI)).build(); + } + } + } + + return client; + } + + private LoadCredentialsResponse fetchCredentials() { + return httpClient() + .get( + properties.get(URI), + null, + LoadCredentialsResponse.class, + OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN)), + ErrorHandlers.defaultErrorHandler()); + } + + private RefreshResult refreshCredential() { + LoadCredentialsResponse response = fetchCredentials(); + + List s3Credentials = + response.credentials().stream() + .filter(c -> c.prefix().startsWith("s3")) + .collect(Collectors.toList()); + + Preconditions.checkState(!s3Credentials.isEmpty(), "Invalid S3 Credentials: empty"); + + Optional credentialWithPrefix = + s3Credentials.stream().max(Comparator.comparingInt(c -> c.prefix().length())); + + Credential s3Credential = credentialWithPrefix.orElseGet(() -> s3Credentials.get(0)); + checkCredential(s3Credential, S3FileIOProperties.ACCESS_KEY_ID); + checkCredential(s3Credential, S3FileIOProperties.SECRET_ACCESS_KEY); + checkCredential(s3Credential, S3FileIOProperties.SESSION_TOKEN); + checkCredential(s3Credential, S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS); + + String accessKeyId = s3Credential.config().get(S3FileIOProperties.ACCESS_KEY_ID); + String secretAccessKey = s3Credential.config().get(S3FileIOProperties.SECRET_ACCESS_KEY); + String sessionToken = s3Credential.config().get(S3FileIOProperties.SESSION_TOKEN); + String tokenExpiresAtMillis = + s3Credential.config().get(S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS); + Instant expiresAt = Instant.ofEpochMilli(Long.parseLong(tokenExpiresAtMillis)); + Instant prefetchAt = expiresAt.minus(5, ChronoUnit.MINUTES); + + return RefreshResult.builder( + (AwsCredentials) + AwsSessionCredentials.builder() + .accessKeyId(accessKeyId) + .secretAccessKey(secretAccessKey) + .sessionToken(sessionToken) + .expirationTime(expiresAt) + .build()) + .staleTime(expiresAt) + .prefetchTime(prefetchAt) + .build(); + } + + private void checkCredential(Credential credential, String property) { + Preconditions.checkState( + credential.config().containsKey(property), "Invalid S3 Credentials: %s not set", property); + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/AwsClientPropertiesTest.java b/aws/src/test/java/org/apache/iceberg/aws/AwsClientPropertiesTest.java index c318538d9509..5cf9dd810c9f 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/AwsClientPropertiesTest.java +++ b/aws/src/test/java/org/apache/iceberg/aws/AwsClientPropertiesTest.java @@ -21,6 +21,8 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Map; +import org.apache.iceberg.aws.s3.VendedCredentialsProvider; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -29,6 +31,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3ClientBuilder; @@ -111,4 +114,30 @@ public void testSessionCredentialsConfiguration() { .as("The secret access key should be the same as the one set by tag SECRET_ACCESS_KEY") .isEqualTo("secret"); } + + @Test + public void refreshCredentialsEndpoint() { + AwsClientProperties awsClientProperties = + new AwsClientProperties( + ImmutableMap.of( + AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT, + "http://localhost:1234/v1/credentials")); + + assertThat(awsClientProperties.credentialsProvider("key", "secret", "token")) + .isInstanceOf(VendedCredentialsProvider.class); + } + + @Test + public void refreshCredentialsEndpointSetButRefreshDisabled() { + AwsClientProperties awsClientProperties = + new AwsClientProperties( + ImmutableMap.of( + AwsClientProperties.REFRESH_CREDENTIALS_ENABLED, + "false", + AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT, + "http://localhost:1234/v1/credentials")); + + assertThat(awsClientProperties.credentialsProvider("key", "secret", "token")) + .isInstanceOf(StaticCredentialsProvider.class); + } } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java new file mode 100644 index 000000000000..9934c35a0503 --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws.s3; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockserver.integration.ClientAndServer.startClientAndServer; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import org.apache.iceberg.exceptions.RESTException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.HttpMethod; +import org.apache.iceberg.rest.credentials.Credential; +import org.apache.iceberg.rest.credentials.ImmutableCredential; +import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse; +import org.apache.iceberg.rest.responses.LoadCredentialsResponse; +import org.apache.iceberg.rest.responses.LoadCredentialsResponseParser; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.HttpRequest; +import org.mockserver.model.HttpResponse; +import org.mockserver.verify.VerificationTimes; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; + +public class TestVendedCredentialsProvider { + + private static final int PORT = 3232; + private static final String URI = String.format("http://127.0.0.1:%d/v1/credentials", PORT); + private static ClientAndServer mockServer; + + @BeforeAll + public static void beforeAll() { + mockServer = startClientAndServer(PORT); + } + + @AfterAll + public static void stopServer() { + mockServer.stop(); + } + + @BeforeEach + public void before() { + mockServer.reset(); + } + + @Test + public void invalidOrMissingUri() { + assertThatThrownBy(() -> VendedCredentialsProvider.create(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid properties: null"); + assertThatThrownBy(() -> VendedCredentialsProvider.create(ImmutableMap.of())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid URI: null"); + + try (VendedCredentialsProvider provider = + VendedCredentialsProvider.create( + ImmutableMap.of(VendedCredentialsProvider.URI, "invalid uri"))) { + assertThatThrownBy(provider::resolveCredentials) + .isInstanceOf(RESTException.class) + .hasMessageStartingWith("Failed to create request URI from base invalid uri"); + } + } + + @Test + public void noS3CredentialsInResponse() { + HttpRequest mockRequest = request("/v1/credentials").withMethod(HttpMethod.GET.name()); + + HttpResponse mockResponse = + response( + LoadCredentialsResponseParser.toJson( + ImmutableLoadCredentialsResponse.builder().build())) + .withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + try (VendedCredentialsProvider provider = + VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { + assertThatThrownBy(provider::resolveCredentials) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid S3 Credentials: empty"); + } + } + + @Test + public void accessKeyIdAndSecretAccessKeyWithoutToken() { + HttpRequest mockRequest = request("/v1/credentials").withMethod(HttpMethod.GET.name()); + LoadCredentialsResponse response = + ImmutableLoadCredentialsResponse.builder() + .addCredentials( + ImmutableCredential.builder() + .prefix("s3") + .config( + ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, + "randomAccessKey", + S3FileIOProperties.SECRET_ACCESS_KEY, + "randomSecretAccessKey")) + .build()) + .build(); + + HttpResponse mockResponse = + response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + try (VendedCredentialsProvider provider = + VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { + assertThatThrownBy(provider::resolveCredentials) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid S3 Credentials: s3.session-token not set"); + } + } + + @Test + public void expirationNotSet() { + HttpRequest mockRequest = request("/v1/credentials").withMethod(HttpMethod.GET.name()); + LoadCredentialsResponse response = + ImmutableLoadCredentialsResponse.builder() + .addCredentials( + ImmutableCredential.builder() + .prefix("s3") + .config( + ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, + "randomAccessKey", + S3FileIOProperties.SECRET_ACCESS_KEY, + "randomSecretAccessKey", + S3FileIOProperties.SESSION_TOKEN, + "sessionToken")) + .build()) + .build(); + + HttpResponse mockResponse = + response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + try (VendedCredentialsProvider provider = + VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { + assertThatThrownBy(provider::resolveCredentials) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid S3 Credentials: s3.session-token-expires-at-ms not set"); + } + } + + @Test + public void nonExpiredToken() { + HttpRequest mockRequest = request("/v1/credentials").withMethod(HttpMethod.GET.name()); + Credential credential = + ImmutableCredential.builder() + .prefix("s3") + .config( + ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, + "randomAccessKey", + S3FileIOProperties.SECRET_ACCESS_KEY, + "randomSecretAccessKey", + S3FileIOProperties.SESSION_TOKEN, + "sessionToken", + S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS, + Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli()))) + .build(); + LoadCredentialsResponse response = + ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build(); + + HttpResponse mockResponse = + response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + try (VendedCredentialsProvider provider = + VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { + AwsCredentials awsCredentials = provider.resolveCredentials(); + + verifyCredentials(awsCredentials, credential); + + for (int i = 0; i < 5; i++) { + // resolving credentials multiple times should not hit the credentials endpoint again + assertThat(provider.resolveCredentials()).isSameAs(awsCredentials); + } + } + + mockServer.verify(mockRequest, VerificationTimes.once()); + } + + @Test + public void expiredToken() { + HttpRequest mockRequest = request("/v1/credentials").withMethod(HttpMethod.GET.name()); + Credential credential = + ImmutableCredential.builder() + .prefix("s3") + .config( + ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, + "randomAccessKey", + S3FileIOProperties.SECRET_ACCESS_KEY, + "randomSecretAccessKey", + S3FileIOProperties.SESSION_TOKEN, + "sessionToken", + S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS, + Long.toString(Instant.now().minus(1, ChronoUnit.MINUTES).toEpochMilli()))) + .build(); + LoadCredentialsResponse response = + ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build(); + + HttpResponse mockResponse = + response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + try (VendedCredentialsProvider provider = + VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { + AwsCredentials awsCredentials = provider.resolveCredentials(); + verifyCredentials(awsCredentials, credential); + + // resolving credentials multiple times should hit the credentials endpoint again + AwsCredentials refreshedCredentials = provider.resolveCredentials(); + assertThat(refreshedCredentials).isNotSameAs(awsCredentials); + // TODO + verifyCredentials(refreshedCredentials, credential); + } + + mockServer.verify(mockRequest, VerificationTimes.exactly(2)); + } + + @Test + public void credentialWithLongestPrefixIsUsed() { + HttpRequest mockRequest = request("/v1/credentials").withMethod(HttpMethod.GET.name()); + Credential credentialOne = + ImmutableCredential.builder() + .prefix("s3") + .config( + ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, + "randomAccessKey1", + S3FileIOProperties.SECRET_ACCESS_KEY, + "randomSecretAccessKey1", + S3FileIOProperties.SESSION_TOKEN, + "sessionToken1", + S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS, + Long.toString(Instant.now().plus(1, ChronoUnit.HOURS).toEpochMilli()))) + .build(); + Credential credentialTwo = + ImmutableCredential.builder() + .prefix("s3://custom-uri/longest-prefix") + .config( + ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, + "randomAccessKey2", + S3FileIOProperties.SECRET_ACCESS_KEY, + "randomSecretAccessKey2", + S3FileIOProperties.SESSION_TOKEN, + "sessionToken2", + S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS, + Long.toString(Instant.now().plus(2, ChronoUnit.HOURS).toEpochMilli()))) + .build(); + Credential credentialThree = + ImmutableCredential.builder() + .prefix("s3://custom-uri/long") + .config( + ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, + "randomAccessKey3", + S3FileIOProperties.SECRET_ACCESS_KEY, + "randomSecretAccessKey3", + S3FileIOProperties.SESSION_TOKEN, + "sessionToken3", + S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS, + Long.toString(Instant.now().plus(3, ChronoUnit.HOURS).toEpochMilli()))) + .build(); + LoadCredentialsResponse response = + ImmutableLoadCredentialsResponse.builder() + .addCredentials(credentialOne, credentialTwo, credentialThree) + .build(); + + HttpResponse mockResponse = + response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200); + mockServer.when(mockRequest).respond(mockResponse); + + try (VendedCredentialsProvider provider = + VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { + AwsCredentials awsCredentials = provider.resolveCredentials(); + verifyCredentials(awsCredentials, credentialTwo); + } + } + + private void verifyCredentials(AwsCredentials awsCredentials, Credential credential) { + assertThat(awsCredentials).isInstanceOf(AwsSessionCredentials.class); + AwsSessionCredentials creds = (AwsSessionCredentials) awsCredentials; + + assertThat(creds.accessKeyId()) + .isEqualTo(credential.config().get(S3FileIOProperties.ACCESS_KEY_ID)); + assertThat(creds.secretAccessKey()) + .isEqualTo(credential.config().get(S3FileIOProperties.SECRET_ACCESS_KEY)); + assertThat(creds.sessionToken()) + .isEqualTo(credential.config().get(S3FileIOProperties.SESSION_TOKEN)); + assertThat(creds.expirationTime()) + .isPresent() + .get() + .extracting(Instant::toEpochMilli) + .isEqualTo( + Long.parseLong( + credential.config().get(S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS))); + } +} From 11c5748556ec3a1769cfca0533da7db558c9d340 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Mon, 28 Oct 2024 09:27:06 +0100 Subject: [PATCH 2/4] only allow one S3 credential --- .../iceberg/aws/s3/VendedCredentialsProvider.java | 9 +++------ .../aws/s3/TestVendedCredentialsProvider.java | 12 ++++++------ 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java index cf667284d813..e249d3ff1dec 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java @@ -20,10 +20,8 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.stream.Collectors; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.rest.ErrorHandlers; @@ -103,11 +101,10 @@ private RefreshResult refreshCredential() { .collect(Collectors.toList()); Preconditions.checkState(!s3Credentials.isEmpty(), "Invalid S3 Credentials: empty"); + Preconditions.checkState( + s3Credentials.size() == 1, "Invalid S3 Credentials: only one S3 credential should exist"); - Optional credentialWithPrefix = - s3Credentials.stream().max(Comparator.comparingInt(c -> c.prefix().length())); - - Credential s3Credential = credentialWithPrefix.orElseGet(() -> s3Credentials.get(0)); + Credential s3Credential = s3Credentials.get(0); checkCredential(s3Credential, S3FileIOProperties.ACCESS_KEY_ID); checkCredential(s3Credential, S3FileIOProperties.SECRET_ACCESS_KEY); checkCredential(s3Credential, S3FileIOProperties.SESSION_TOKEN); diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java index 9934c35a0503..67cd1cb55241 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java @@ -85,7 +85,7 @@ public void invalidOrMissingUri() { } @Test - public void noS3CredentialsInResponse() { + public void noS3Credentials() { HttpRequest mockRequest = request("/v1/credentials").withMethod(HttpMethod.GET.name()); HttpResponse mockResponse = @@ -234,7 +234,6 @@ public void expiredToken() { // resolving credentials multiple times should hit the credentials endpoint again AwsCredentials refreshedCredentials = provider.resolveCredentials(); assertThat(refreshedCredentials).isNotSameAs(awsCredentials); - // TODO verifyCredentials(refreshedCredentials, credential); } @@ -242,11 +241,11 @@ public void expiredToken() { } @Test - public void credentialWithLongestPrefixIsUsed() { + public void multipleS3Credentials() { HttpRequest mockRequest = request("/v1/credentials").withMethod(HttpMethod.GET.name()); Credential credentialOne = ImmutableCredential.builder() - .prefix("s3") + .prefix("gcs") .config( ImmutableMap.of( S3FileIOProperties.ACCESS_KEY_ID, @@ -297,8 +296,9 @@ public void credentialWithLongestPrefixIsUsed() { try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(ImmutableMap.of(VendedCredentialsProvider.URI, URI))) { - AwsCredentials awsCredentials = provider.resolveCredentials(); - verifyCredentials(awsCredentials, credentialTwo); + assertThatThrownBy(provider::resolveCredentials) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid S3 Credentials: only one S3 credential should exist"); } } From 35f92113ad9c5ee47ddbcd6924339815e9d04a1f Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Mon, 28 Oct 2024 15:43:11 +0100 Subject: [PATCH 3/4] fetch token when a credential was given before retrieving vended creds --- .../aws/s3/VendedCredentialsProvider.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java index e249d3ff1dec..f8a3d2bacedd 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java @@ -27,10 +27,13 @@ import org.apache.iceberg.rest.ErrorHandlers; import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.ResourcePaths; import org.apache.iceberg.rest.auth.OAuth2Properties; import org.apache.iceberg.rest.auth.OAuth2Util; import org.apache.iceberg.rest.credentials.Credential; import org.apache.iceberg.rest.responses.LoadCredentialsResponse; +import org.apache.iceberg.rest.responses.OAuthTokenResponse; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; @@ -83,12 +86,27 @@ private RESTClient httpClient() { } private LoadCredentialsResponse fetchCredentials() { + String initToken = properties.get(OAuth2Properties.TOKEN); + String credential = properties.get(OAuth2Properties.CREDENTIAL); + Map authHeaders = RESTUtil.merge(properties, OAuth2Util.authHeaders(initToken)); + if (credential != null && !credential.isEmpty()) { + OAuthTokenResponse authResponse = + OAuth2Util.fetchToken( + httpClient(), + authHeaders, + credential, + properties.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE), + properties.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, ResourcePaths.tokens()), + OAuth2Util.buildOptionalParam(properties)); + authHeaders = RESTUtil.merge(authHeaders, OAuth2Util.authHeaders(authResponse.token())); + } + return httpClient() .get( properties.get(URI), null, LoadCredentialsResponse.class, - OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN)), + authHeaders, ErrorHandlers.defaultErrorHandler()); } From 41fe92124d9390abea86363774b7c057cfe8960a Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Tue, 29 Oct 2024 17:16:21 +0100 Subject: [PATCH 4/4] Use OAuth token from properties for now --- .../iceberg/aws/AwsClientProperties.java | 6 ++---- .../aws/s3/VendedCredentialsProvider.java | 20 +------------------ 2 files changed, 3 insertions(+), 23 deletions(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java index 0d659a3fa342..4f2d4d6a5a6c 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java @@ -68,10 +68,8 @@ public class AwsClientProperties implements Serializable { public static final String CLIENT_REGION = "client.region"; /** - * Configure the endpoint to be used to fetch and refresh vended credentials. - * - *

When set, the {@link VendedCredentialsProvider} will be used to fetch and refresh vended - * credentials. + * When set, the {@link VendedCredentialsProvider} will be used to fetch and refresh vended + * credentials from this endpoint. */ public static final String REFRESH_CREDENTIALS_ENDPOINT = "client.refresh-credentials-endpoint"; diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java index f8a3d2bacedd..e249d3ff1dec 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java @@ -27,13 +27,10 @@ import org.apache.iceberg.rest.ErrorHandlers; import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTClient; -import org.apache.iceberg.rest.RESTUtil; -import org.apache.iceberg.rest.ResourcePaths; import org.apache.iceberg.rest.auth.OAuth2Properties; import org.apache.iceberg.rest.auth.OAuth2Util; import org.apache.iceberg.rest.credentials.Credential; import org.apache.iceberg.rest.responses.LoadCredentialsResponse; -import org.apache.iceberg.rest.responses.OAuthTokenResponse; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; @@ -86,27 +83,12 @@ private RESTClient httpClient() { } private LoadCredentialsResponse fetchCredentials() { - String initToken = properties.get(OAuth2Properties.TOKEN); - String credential = properties.get(OAuth2Properties.CREDENTIAL); - Map authHeaders = RESTUtil.merge(properties, OAuth2Util.authHeaders(initToken)); - if (credential != null && !credential.isEmpty()) { - OAuthTokenResponse authResponse = - OAuth2Util.fetchToken( - httpClient(), - authHeaders, - credential, - properties.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE), - properties.getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, ResourcePaths.tokens()), - OAuth2Util.buildOptionalParam(properties)); - authHeaders = RESTUtil.merge(authHeaders, OAuth2Util.authHeaders(authResponse.token())); - } - return httpClient() .get( properties.get(URI), null, LoadCredentialsResponse.class, - authHeaders, + OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN)), ErrorHandlers.defaultErrorHandler()); }