Skip to content

Commit 5e51c5b

Browse files
committed
AWS: Refresh vended credentials
1 parent 35a02d0 commit 5e51c5b

File tree

5 files changed

+529
-5
lines changed

5 files changed

+529
-5
lines changed

aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.Serializable;
2222
import java.util.Map;
23+
import org.apache.iceberg.aws.s3.VendedCredentialsProvider;
2324
import org.apache.iceberg.common.DynClasses;
2425
import org.apache.iceberg.common.DynMethods;
2526
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -66,21 +67,39 @@ public class AwsClientProperties implements Serializable {
6667
*/
6768
public static final String CLIENT_REGION = "client.region";
6869

70+
/**
71+
* Configure the endpoint to be used to fetch and refresh vended credentials.
72+
*
73+
* <p>When set, the {@link VendedCredentialsProvider} will be used to fetch and refresh vended
74+
* credentials.
75+
*/
76+
public static final String REFRESH_CREDENTIALS_ENDPOINT = "client.refresh-credentials-endpoint";
77+
78+
/** Controls whether vended credentials should be refreshed or not. Defaults to true. */
79+
public static final String REFRESH_CREDENTIALS_ENABLED = "client.refresh-credentials-enabled";
80+
6981
private String clientRegion;
7082
private final String clientCredentialsProvider;
7183
private final Map<String, String> clientCredentialsProviderProperties;
84+
private final String refreshCredentialsEndpoint;
85+
private final boolean refreshCredentialsEnabled;
7286

7387
public AwsClientProperties() {
7488
this.clientRegion = null;
7589
this.clientCredentialsProvider = null;
7690
this.clientCredentialsProviderProperties = null;
91+
this.refreshCredentialsEndpoint = null;
92+
this.refreshCredentialsEnabled = true;
7793
}
7894

7995
public AwsClientProperties(Map<String, String> properties) {
8096
this.clientRegion = properties.get(CLIENT_REGION);
8197
this.clientCredentialsProvider = properties.get(CLIENT_CREDENTIALS_PROVIDER);
8298
this.clientCredentialsProviderProperties =
8399
PropertyUtil.propertiesWithPrefix(properties, CLIENT_CREDENTIAL_PROVIDER_PREFIX);
100+
this.refreshCredentialsEndpoint = properties.get(REFRESH_CREDENTIALS_ENDPOINT);
101+
this.refreshCredentialsEnabled =
102+
PropertyUtil.propertyAsBoolean(properties, REFRESH_CREDENTIALS_ENABLED, true);
84103
}
85104

86105
public String clientRegion() {
@@ -122,11 +141,12 @@ public <T extends AwsClientBuilder> void applyClientCredentialConfigurations(T b
122141
}
123142

124143
/**
125-
* Returns a credentials provider instance. If params were set, we return a new credentials
126-
* instance. If none of the params are set, we try to dynamically load the provided credentials
127-
* provider class. Upon loading the class, we try to invoke {@code create(Map<String, String>)}
128-
* static method. If that fails, we fall back to {@code create()}. If credential provider class
129-
* wasn't set, we fall back to default credentials provider.
144+
* Returns a credentials provider instance. If {@link #refreshCredentialsEndpoint} is set, an
145+
* instance of {@link VendedCredentialsProvider} is returned. If params were set, we return a new
146+
* credentials instance. If none of the params are set, we try to dynamically load the provided
147+
* credentials provider class. Upon loading the class, we try to invoke {@code create(Map<String,
148+
* String>)} static method. If that fails, we fall back to {@code create()}. If credential
149+
* provider class wasn't set, we fall back to default credentials provider.
130150
*
131151
* @param accessKeyId the AWS access key ID
132152
* @param secretAccessKey the AWS secret access key
@@ -136,6 +156,12 @@ public <T extends AwsClientBuilder> void applyClientCredentialConfigurations(T b
136156
@SuppressWarnings("checkstyle:HiddenField")
137157
public AwsCredentialsProvider credentialsProvider(
138158
String accessKeyId, String secretAccessKey, String sessionToken) {
159+
if (refreshCredentialsEnabled && !Strings.isNullOrEmpty(refreshCredentialsEndpoint)) {
160+
clientCredentialsProviderProperties.put(
161+
VendedCredentialsProvider.URI, refreshCredentialsEndpoint);
162+
return credentialsProvider(VendedCredentialsProvider.class.getName());
163+
}
164+
139165
if (!Strings.isNullOrEmpty(accessKeyId) && !Strings.isNullOrEmpty(secretAccessKey)) {
140166
if (Strings.isNullOrEmpty(sessionToken)) {
141167
return StaticCredentialsProvider.create(

aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,13 @@ public class S3FileIOProperties implements Serializable {
225225
*/
226226
public static final String SESSION_TOKEN = "s3.session-token";
227227

228+
/**
229+
* Configure the expiration time in millis of the static session token used to access S3FileIO.
230+
* This expiration time is currently only used in {@link VendedCredentialsProvider} for refreshing
231+
* vended credentials.
232+
*/
233+
public static final String SESSION_TOKEN_EXPIRES_AT_MS = "s3.session-token-expires-at-ms";
234+
228235
/**
229236
* Enable to make S3FileIO, to make cross-region call to the region specified in the ARN of an
230237
* access point.
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.aws.s3;
20+
21+
import java.time.Instant;
22+
import java.time.temporal.ChronoUnit;
23+
import java.util.Comparator;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Optional;
27+
import java.util.concurrent.ThreadLocalRandom;
28+
import java.util.stream.Collectors;
29+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
30+
import org.apache.iceberg.rest.ErrorHandlers;
31+
import org.apache.iceberg.rest.HTTPClient;
32+
import org.apache.iceberg.rest.RESTClient;
33+
import org.apache.iceberg.rest.auth.OAuth2Properties;
34+
import org.apache.iceberg.rest.auth.OAuth2Util;
35+
import org.apache.iceberg.rest.credentials.Credential;
36+
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
37+
import software.amazon.awssdk.auth.credentials.AwsCredentials;
38+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
39+
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
40+
import software.amazon.awssdk.utils.IoUtils;
41+
import software.amazon.awssdk.utils.SdkAutoCloseable;
42+
import software.amazon.awssdk.utils.cache.CachedSupplier;
43+
import software.amazon.awssdk.utils.cache.RefreshResult;
44+
45+
public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAutoCloseable {
46+
public static final String URI = "credentials.uri";
47+
private volatile HTTPClient client;
48+
private final Map<String, String> properties;
49+
private final CachedSupplier<AwsCredentials> credentialCache;
50+
51+
private VendedCredentialsProvider(Map<String, String> properties) {
52+
Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null");
53+
this.properties = properties;
54+
this.credentialCache =
55+
CachedSupplier.builder(this::refreshCredential)
56+
.cachedValueName(VendedCredentialsProvider.class.getName())
57+
.build();
58+
}
59+
60+
@Override
61+
public AwsCredentials resolveCredentials() {
62+
return credentialCache.get();
63+
}
64+
65+
@Override
66+
public void close() {
67+
IoUtils.closeQuietly(client, null);
68+
credentialCache.close();
69+
}
70+
71+
public static VendedCredentialsProvider create(Map<String, String> properties) {
72+
return new VendedCredentialsProvider(properties);
73+
}
74+
75+
private RESTClient httpClient() {
76+
if (null == client) {
77+
synchronized (this) {
78+
if (null == client) {
79+
client = HTTPClient.builder(properties).uri(properties.get(URI)).build();
80+
}
81+
}
82+
}
83+
84+
return client;
85+
}
86+
87+
private RefreshResult<AwsCredentials> refreshCredential() {
88+
LoadCredentialsResponse response = fetchCredentials();
89+
90+
List<Credential> s3Credentials =
91+
response.credentials().stream()
92+
.filter(c -> c.prefix().startsWith("s3"))
93+
.collect(Collectors.toList());
94+
95+
Preconditions.checkState(!s3Credentials.isEmpty(), "Invalid S3 Credentials: empty");
96+
97+
Optional<Credential> credentialWithPrefix =
98+
s3Credentials.stream().max(Comparator.comparingInt(c -> c.prefix().length()));
99+
100+
Credential s3Credential = credentialWithPrefix.orElseGet(() -> s3Credentials.get(0));
101+
checkCredential(s3Credential, S3FileIOProperties.ACCESS_KEY_ID);
102+
checkCredential(s3Credential, S3FileIOProperties.SECRET_ACCESS_KEY);
103+
checkCredential(s3Credential, S3FileIOProperties.SESSION_TOKEN);
104+
checkCredential(s3Credential, S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS);
105+
106+
String accessKeyId = s3Credential.config().get(S3FileIOProperties.ACCESS_KEY_ID);
107+
String secretAccessKey = s3Credential.config().get(S3FileIOProperties.SECRET_ACCESS_KEY);
108+
String sessionToken = s3Credential.config().get(S3FileIOProperties.SESSION_TOKEN);
109+
String tokenExpiresAtMillis =
110+
s3Credential.config().get(S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS);
111+
Instant expiresAt = Instant.ofEpochMilli(Long.parseLong(tokenExpiresAtMillis));
112+
int jitter = ThreadLocalRandom.current().nextInt(0, 100);
113+
Instant prefetchAt = expiresAt.minus(5, ChronoUnit.MINUTES).plusSeconds(jitter);
114+
115+
return RefreshResult.builder(
116+
(AwsCredentials)
117+
AwsSessionCredentials.builder()
118+
.accessKeyId(accessKeyId)
119+
.secretAccessKey(secretAccessKey)
120+
.sessionToken(sessionToken)
121+
.expirationTime(expiresAt)
122+
.build())
123+
.staleTime(expiresAt)
124+
.prefetchTime(prefetchAt)
125+
.build();
126+
}
127+
128+
private void checkCredential(Credential credential, String property) {
129+
Preconditions.checkState(
130+
credential.config().containsKey(property), "Invalid S3 Credentials: %s not set", property);
131+
}
132+
133+
private LoadCredentialsResponse fetchCredentials() {
134+
return httpClient()
135+
.get(
136+
properties.get(URI),
137+
null,
138+
LoadCredentialsResponse.class,
139+
OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN)),
140+
ErrorHandlers.defaultErrorHandler());
141+
}
142+
}

aws/src/test/java/org/apache/iceberg/aws/AwsClientPropertiesTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.assertj.core.api.Assertions.assertThat;
2222

2323
import java.util.Map;
24+
import org.apache.iceberg.aws.s3.VendedCredentialsProvider;
25+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
2426
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
2527
import org.junit.jupiter.api.Test;
2628
import org.mockito.ArgumentCaptor;
@@ -29,6 +31,7 @@
2931
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
3032
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
3133
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
34+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
3235
import software.amazon.awssdk.regions.Region;
3336
import software.amazon.awssdk.services.s3.S3ClientBuilder;
3437

@@ -111,4 +114,30 @@ public void testSessionCredentialsConfiguration() {
111114
.as("The secret access key should be the same as the one set by tag SECRET_ACCESS_KEY")
112115
.isEqualTo("secret");
113116
}
117+
118+
@Test
119+
public void refreshCredentialsEndpoint() {
120+
AwsClientProperties awsClientProperties =
121+
new AwsClientProperties(
122+
ImmutableMap.of(
123+
AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT,
124+
"http://localhost:1234/v1/credentials"));
125+
126+
assertThat(awsClientProperties.credentialsProvider("key", "secret", "token"))
127+
.isInstanceOf(VendedCredentialsProvider.class);
128+
}
129+
130+
@Test
131+
public void refreshCredentialsEndpointSetButRefreshDisabled() {
132+
AwsClientProperties awsClientProperties =
133+
new AwsClientProperties(
134+
ImmutableMap.of(
135+
AwsClientProperties.REFRESH_CREDENTIALS_ENABLED,
136+
"false",
137+
AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT,
138+
"http://localhost:1234/v1/credentials"));
139+
140+
assertThat(awsClientProperties.credentialsProvider("key", "secret", "token"))
141+
.isInstanceOf(StaticCredentialsProvider.class);
142+
}
114143
}

0 commit comments

Comments
 (0)