Skip to content

Commit 2010347

Browse files
nastrazachdisc
authored andcommitted
AWS: Refresh vended credentials (apache#11389)
1 parent 0dd8015 commit 2010347

File tree

5 files changed

+526
-5
lines changed

5 files changed

+526
-5
lines changed

aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.Serializable;
2222
import java.util.Map;
23+
import org.apache.iceberg.aws.s3.VendedCredentialsProvider;
2324
import org.apache.iceberg.common.DynClasses;
2425
import org.apache.iceberg.common.DynMethods;
2526
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -66,21 +67,37 @@ public class AwsClientProperties implements Serializable {
6667
*/
6768
public static final String CLIENT_REGION = "client.region";
6869

70+
/**
71+
* When set, the {@link VendedCredentialsProvider} will be used to fetch and refresh vended
72+
* credentials from this endpoint.
73+
*/
74+
public static final String REFRESH_CREDENTIALS_ENDPOINT = "client.refresh-credentials-endpoint";
75+
76+
/** Controls whether vended credentials should be refreshed or not. Defaults to true. */
77+
public static final String REFRESH_CREDENTIALS_ENABLED = "client.refresh-credentials-enabled";
78+
6979
private String clientRegion;
7080
private final String clientCredentialsProvider;
7181
private final Map<String, String> clientCredentialsProviderProperties;
82+
private final String refreshCredentialsEndpoint;
83+
private final boolean refreshCredentialsEnabled;
7284

7385
public AwsClientProperties() {
7486
this.clientRegion = null;
7587
this.clientCredentialsProvider = null;
7688
this.clientCredentialsProviderProperties = null;
89+
this.refreshCredentialsEndpoint = null;
90+
this.refreshCredentialsEnabled = true;
7791
}
7892

7993
public AwsClientProperties(Map<String, String> properties) {
8094
this.clientRegion = properties.get(CLIENT_REGION);
8195
this.clientCredentialsProvider = properties.get(CLIENT_CREDENTIALS_PROVIDER);
8296
this.clientCredentialsProviderProperties =
8397
PropertyUtil.propertiesWithPrefix(properties, CLIENT_CREDENTIAL_PROVIDER_PREFIX);
98+
this.refreshCredentialsEndpoint = properties.get(REFRESH_CREDENTIALS_ENDPOINT);
99+
this.refreshCredentialsEnabled =
100+
PropertyUtil.propertyAsBoolean(properties, REFRESH_CREDENTIALS_ENABLED, true);
84101
}
85102

86103
public String clientRegion() {
@@ -122,11 +139,12 @@ public <T extends AwsClientBuilder> void applyClientCredentialConfigurations(T b
122139
}
123140

124141
/**
125-
* Returns a credentials provider instance. If params were set, we return a new credentials
126-
* instance. If none of the params are set, we try to dynamically load the provided credentials
127-
* provider class. Upon loading the class, we try to invoke {@code create(Map<String, String>)}
128-
* static method. If that fails, we fall back to {@code create()}. If credential provider class
129-
* wasn't set, we fall back to default credentials provider.
142+
* Returns a credentials provider instance. If {@link #refreshCredentialsEndpoint} is set, an
143+
* instance of {@link VendedCredentialsProvider} is returned. If params were set, we return a new
144+
* credentials instance. If none of the params are set, we try to dynamically load the provided
145+
* credentials provider class. Upon loading the class, we try to invoke {@code create(Map<String,
146+
* String>)} static method. If that fails, we fall back to {@code create()}. If credential
147+
* provider class wasn't set, we fall back to default credentials provider.
130148
*
131149
* @param accessKeyId the AWS access key ID
132150
* @param secretAccessKey the AWS secret access key
@@ -136,6 +154,12 @@ public <T extends AwsClientBuilder> void applyClientCredentialConfigurations(T b
136154
@SuppressWarnings("checkstyle:HiddenField")
137155
public AwsCredentialsProvider credentialsProvider(
138156
String accessKeyId, String secretAccessKey, String sessionToken) {
157+
if (refreshCredentialsEnabled && !Strings.isNullOrEmpty(refreshCredentialsEndpoint)) {
158+
clientCredentialsProviderProperties.put(
159+
VendedCredentialsProvider.URI, refreshCredentialsEndpoint);
160+
return credentialsProvider(VendedCredentialsProvider.class.getName());
161+
}
162+
139163
if (!Strings.isNullOrEmpty(accessKeyId) && !Strings.isNullOrEmpty(secretAccessKey)) {
140164
if (Strings.isNullOrEmpty(sessionToken)) {
141165
return StaticCredentialsProvider.create(

aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,13 @@ public class S3FileIOProperties implements Serializable {
225225
*/
226226
public static final String SESSION_TOKEN = "s3.session-token";
227227

228+
/**
229+
* Configure the expiration time in millis of the static session token used to access S3FileIO.
230+
* This expiration time is currently only used in {@link VendedCredentialsProvider} for refreshing
231+
* vended credentials.
232+
*/
233+
static final String SESSION_TOKEN_EXPIRES_AT_MS = "s3.session-token-expires-at-ms";
234+
228235
/**
229236
* Enable to make S3FileIO, to make cross-region call to the region specified in the ARN of an
230237
* access point.
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.aws.s3;
20+
21+
import java.time.Instant;
22+
import java.time.temporal.ChronoUnit;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.stream.Collectors;
26+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
27+
import org.apache.iceberg.rest.ErrorHandlers;
28+
import org.apache.iceberg.rest.HTTPClient;
29+
import org.apache.iceberg.rest.RESTClient;
30+
import org.apache.iceberg.rest.auth.OAuth2Properties;
31+
import org.apache.iceberg.rest.auth.OAuth2Util;
32+
import org.apache.iceberg.rest.credentials.Credential;
33+
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
34+
import software.amazon.awssdk.auth.credentials.AwsCredentials;
35+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
36+
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
37+
import software.amazon.awssdk.utils.IoUtils;
38+
import software.amazon.awssdk.utils.SdkAutoCloseable;
39+
import software.amazon.awssdk.utils.cache.CachedSupplier;
40+
import software.amazon.awssdk.utils.cache.RefreshResult;
41+
42+
public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAutoCloseable {
43+
public static final String URI = "credentials.uri";
44+
private volatile HTTPClient client;
45+
private final Map<String, String> properties;
46+
private final CachedSupplier<AwsCredentials> credentialCache;
47+
48+
private VendedCredentialsProvider(Map<String, String> properties) {
49+
Preconditions.checkArgument(null != properties, "Invalid properties: null");
50+
Preconditions.checkArgument(null != properties.get(URI), "Invalid URI: null");
51+
this.properties = properties;
52+
this.credentialCache =
53+
CachedSupplier.builder(this::refreshCredential)
54+
.cachedValueName(VendedCredentialsProvider.class.getName())
55+
.build();
56+
}
57+
58+
@Override
59+
public AwsCredentials resolveCredentials() {
60+
return credentialCache.get();
61+
}
62+
63+
@Override
64+
public void close() {
65+
IoUtils.closeQuietly(client, null);
66+
credentialCache.close();
67+
}
68+
69+
public static VendedCredentialsProvider create(Map<String, String> properties) {
70+
return new VendedCredentialsProvider(properties);
71+
}
72+
73+
private RESTClient httpClient() {
74+
if (null == client) {
75+
synchronized (this) {
76+
if (null == client) {
77+
client = HTTPClient.builder(properties).uri(properties.get(URI)).build();
78+
}
79+
}
80+
}
81+
82+
return client;
83+
}
84+
85+
private LoadCredentialsResponse fetchCredentials() {
86+
return httpClient()
87+
.get(
88+
properties.get(URI),
89+
null,
90+
LoadCredentialsResponse.class,
91+
OAuth2Util.authHeaders(properties.get(OAuth2Properties.TOKEN)),
92+
ErrorHandlers.defaultErrorHandler());
93+
}
94+
95+
private RefreshResult<AwsCredentials> refreshCredential() {
96+
LoadCredentialsResponse response = fetchCredentials();
97+
98+
List<Credential> s3Credentials =
99+
response.credentials().stream()
100+
.filter(c -> c.prefix().startsWith("s3"))
101+
.collect(Collectors.toList());
102+
103+
Preconditions.checkState(!s3Credentials.isEmpty(), "Invalid S3 Credentials: empty");
104+
Preconditions.checkState(
105+
s3Credentials.size() == 1, "Invalid S3 Credentials: only one S3 credential should exist");
106+
107+
Credential s3Credential = s3Credentials.get(0);
108+
checkCredential(s3Credential, S3FileIOProperties.ACCESS_KEY_ID);
109+
checkCredential(s3Credential, S3FileIOProperties.SECRET_ACCESS_KEY);
110+
checkCredential(s3Credential, S3FileIOProperties.SESSION_TOKEN);
111+
checkCredential(s3Credential, S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS);
112+
113+
String accessKeyId = s3Credential.config().get(S3FileIOProperties.ACCESS_KEY_ID);
114+
String secretAccessKey = s3Credential.config().get(S3FileIOProperties.SECRET_ACCESS_KEY);
115+
String sessionToken = s3Credential.config().get(S3FileIOProperties.SESSION_TOKEN);
116+
String tokenExpiresAtMillis =
117+
s3Credential.config().get(S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS);
118+
Instant expiresAt = Instant.ofEpochMilli(Long.parseLong(tokenExpiresAtMillis));
119+
Instant prefetchAt = expiresAt.minus(5, ChronoUnit.MINUTES);
120+
121+
return RefreshResult.builder(
122+
(AwsCredentials)
123+
AwsSessionCredentials.builder()
124+
.accessKeyId(accessKeyId)
125+
.secretAccessKey(secretAccessKey)
126+
.sessionToken(sessionToken)
127+
.expirationTime(expiresAt)
128+
.build())
129+
.staleTime(expiresAt)
130+
.prefetchTime(prefetchAt)
131+
.build();
132+
}
133+
134+
private void checkCredential(Credential credential, String property) {
135+
Preconditions.checkState(
136+
credential.config().containsKey(property), "Invalid S3 Credentials: %s not set", property);
137+
}
138+
}

aws/src/test/java/org/apache/iceberg/aws/AwsClientPropertiesTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.assertj.core.api.Assertions.assertThat;
2222

2323
import java.util.Map;
24+
import org.apache.iceberg.aws.s3.VendedCredentialsProvider;
25+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
2426
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
2527
import org.junit.jupiter.api.Test;
2628
import org.mockito.ArgumentCaptor;
@@ -29,6 +31,7 @@
2931
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
3032
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
3133
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
34+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
3235
import software.amazon.awssdk.regions.Region;
3336
import software.amazon.awssdk.services.s3.S3ClientBuilder;
3437

@@ -111,4 +114,30 @@ public void testSessionCredentialsConfiguration() {
111114
.as("The secret access key should be the same as the one set by tag SECRET_ACCESS_KEY")
112115
.isEqualTo("secret");
113116
}
117+
118+
@Test
119+
public void refreshCredentialsEndpoint() {
120+
AwsClientProperties awsClientProperties =
121+
new AwsClientProperties(
122+
ImmutableMap.of(
123+
AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT,
124+
"http://localhost:1234/v1/credentials"));
125+
126+
assertThat(awsClientProperties.credentialsProvider("key", "secret", "token"))
127+
.isInstanceOf(VendedCredentialsProvider.class);
128+
}
129+
130+
@Test
131+
public void refreshCredentialsEndpointSetButRefreshDisabled() {
132+
AwsClientProperties awsClientProperties =
133+
new AwsClientProperties(
134+
ImmutableMap.of(
135+
AwsClientProperties.REFRESH_CREDENTIALS_ENABLED,
136+
"false",
137+
AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT,
138+
"http://localhost:1234/v1/credentials"));
139+
140+
assertThat(awsClientProperties.credentialsProvider("key", "secret", "token"))
141+
.isInstanceOf(StaticCredentialsProvider.class);
142+
}
114143
}

0 commit comments

Comments
 (0)