Skip to content

Commit 8e99fe0

Browse files
authored
feat: Support customizing S3 endpoints (apache#1913) (apache#93)
No functional change. Introduce a dedicated interface for `StsClient` suppliers and implement it using a pool of cached clients. All client are "thin" and share the same `SdkHttpClient`. The latter is closed when the server shuts down. This is a step towards supporting non-AWS S3 storage (apache#1530). For this reason the STS endpoint is present in new interfaces, but is not used yet.
1 parent 9953b21 commit 8e99fe0

File tree

27 files changed

+1280
-150
lines changed

27 files changed

+1280
-150
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
3535

3636
### New Features
3737

38+
- Added Catalog configuration for S3 and STS endpoints. This also allows using non-AWS S3 implementations.
39+
3840
### Changes
3941

4042
### Deprecations

LICENSE

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,12 @@ This product includes code from Project Nessie.
319319
* tools/config-docs/generator/src/test/java/tests/smallrye/SomeEnum.java
320320
* tools/config-docs/generator/src/test/java/tests/smallrye/VeryNested.java
321321
* tools/container-spec-helper/src/main/java/org/apache/polaris/containerspec/ContainerSpecHelper.java
322+
* tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/Minio.java
323+
* tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioAccess.java
324+
* tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioContainer.java
325+
* tools/minio-testcontainer/src/main/java/org/apache/polaris/test/minio/MinioExtension.java
322326
* runtime/admin/src/main/java/org/apache/polaris/admintool/PolarisAdminTool.java
327+
* runtime/service/src/main/java/org/apache/polaris/service/storage/aws/StsClientsPool.java
323328
* helm/polaris/tests/logging_storage_test.yaml
324329
* helm/polaris/tests/quantity_test.yaml
325330
* helm/polaris/tests/service_monitor_test.yaml

bom/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ dependencies {
3030
api(project(":polaris-api-management-service"))
3131

3232
api(project(":polaris-container-spec-helper"))
33+
api(project(":polaris-minio-testcontainer"))
3334
api(project(":polaris-immutables"))
3435
api(project(":polaris-misc-types"))
3536
api(project(":polaris-version"))

gradle/projects.main.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ polaris-tests=integration-tests
3939
aggregated-license-report=aggregated-license-report
4040
polaris-immutables=tools/immutables
4141
polaris-container-spec-helper=tools/container-spec-helper
42+
polaris-minio-testcontainer=tools/minio-testcontainer
4243
polaris-version=tools/version
4344
polaris-misc-types=tools/misc-types
4445

polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,9 @@ public Builder setStorageConfigurationInfo(
273273
new ArrayList<>(allowedLocations),
274274
awsConfigModel.getRoleArn(),
275275
awsConfigModel.getExternalId(),
276-
awsConfigModel.getRegion());
276+
awsConfigModel.getRegion(),
277+
awsConfigModel.getEndpoint(),
278+
awsConfigModel.getStsEndpoint());
277279
awsConfig.validateArn(awsConfigModel.getRoleArn());
278280
config = awsConfig;
279281
break;

polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.polaris.core.storage.InMemoryStorageIntegration;
3333
import org.apache.polaris.core.storage.StorageAccessProperty;
3434
import org.apache.polaris.core.storage.StorageUtil;
35+
import org.apache.polaris.core.storage.aws.StsClientProvider.StsDestination;
3536
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
3637
import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
3738
import software.amazon.awssdk.policybuilder.iam.IamEffect;
@@ -45,28 +46,22 @@
4546
/** Credential vendor that supports generating */
4647
public class AwsCredentialsStorageIntegration
4748
extends InMemoryStorageIntegration<AwsStorageConfigurationInfo> {
48-
private final StsClient stsClient;
49+
private final StsClientProvider stsClientProvider;
4950
private final Optional<AwsCredentialsProvider> credentialsProvider;
50-
private final Map<StorageAccessProperty, String> awsCredentialsAdditionalProperties =
51-
new HashMap<>();
5251

53-
public AwsCredentialsStorageIntegration(StsClient stsClient) {
54-
this(stsClient, Optional.empty());
52+
public AwsCredentialsStorageIntegration(StsClient fixedClient) {
53+
this((destination) -> fixedClient);
5554
}
5655

57-
public AwsCredentialsStorageIntegration(
58-
StsClient stsClient, Optional<AwsCredentialsProvider> credentialsProvider) {
59-
this(stsClient, credentialsProvider, Map.of());
56+
public AwsCredentialsStorageIntegration(StsClientProvider stsClientProvider) {
57+
this(stsClientProvider, Optional.empty());
6058
}
6159

6260
public AwsCredentialsStorageIntegration(
63-
StsClient stsClient,
64-
Optional<AwsCredentialsProvider> credentialsProvider,
65-
Map<StorageAccessProperty, String> awsCredentialsAdditionalProperties) {
61+
StsClientProvider stsClientProvider, Optional<AwsCredentialsProvider> credentialsProvider) {
6662
super(AwsCredentialsStorageIntegration.class.getName());
67-
this.stsClient = stsClient;
63+
this.stsClientProvider = stsClientProvider;
6864
this.credentialsProvider = credentialsProvider;
69-
this.awsCredentialsAdditionalProperties.putAll(awsCredentialsAdditionalProperties);
7065
}
7166

7267
/** {@inheritDoc} */
@@ -77,29 +72,36 @@ public EnumMap<StorageAccessProperty, String> getSubscopedCreds(
7772
boolean allowListOperation,
7873
@Nonnull Set<String> allowedReadLocations,
7974
@Nonnull Set<String> allowedWriteLocations) {
75+
int storageCredentialDurationSeconds =
76+
callContext
77+
.getPolarisCallContext()
78+
.getConfigurationStore()
79+
.getConfiguration(callContext.getRealmContext(), STORAGE_CREDENTIAL_DURATION_SECONDS);
80+
AssumeRoleRequest.Builder request =
81+
AssumeRoleRequest.builder()
82+
.externalId(storageConfig.getExternalId())
83+
.roleArn(storageConfig.getRoleARN())
84+
.roleSessionName("PolarisAwsCredentialsStorageIntegration")
85+
.policy(
86+
policyString(
87+
storageConfig.getRoleARN(),
88+
allowListOperation,
89+
allowedReadLocations,
90+
allowedWriteLocations)
91+
.toJson())
92+
.durationSeconds(storageCredentialDurationSeconds);
93+
credentialsProvider.ifPresent(
94+
cp -> request.overrideConfiguration(b -> b.credentialsProvider(cp)));
95+
96+
@SuppressWarnings("resource")
97+
// Note: stsClientProvider returns "thin" clients that do not need closing
98+
StsClient stsClient =
99+
stsClientProvider.stsClient(
100+
StsDestination.of(storageConfig.getStsEndpointUri(), storageConfig.getRegion()));
101+
80102
EnumMap<StorageAccessProperty, String> credentialMap =
81103
new EnumMap<>(StorageAccessProperty.class);
82104
if (stsClient != null) {
83-
int storageCredentialDurationSeconds =
84-
callContext
85-
.getPolarisCallContext()
86-
.getConfigurationStore()
87-
.getConfiguration(callContext.getRealmContext(), STORAGE_CREDENTIAL_DURATION_SECONDS);
88-
AssumeRoleRequest.Builder request =
89-
AssumeRoleRequest.builder()
90-
.externalId(storageConfig.getExternalId())
91-
.roleArn(storageConfig.getRoleARN())
92-
.roleSessionName("PolarisAwsCredentialsStorageIntegration")
93-
.policy(
94-
policyString(
95-
storageConfig.getRoleARN(),
96-
allowListOperation,
97-
allowedReadLocations,
98-
allowedWriteLocations)
99-
.toJson())
100-
.durationSeconds(storageCredentialDurationSeconds);
101-
credentialsProvider.ifPresent(
102-
cp -> request.overrideConfiguration(b -> b.credentialsProvider(cp)));
103105
AssumeRoleResponse response = stsClient.assumeRole(request.build());
104106
credentialMap.put(StorageAccessProperty.AWS_KEY_ID, response.credentials().accessKeyId());
105107
credentialMap.put(
@@ -120,17 +122,18 @@ public EnumMap<StorageAccessProperty, String> getSubscopedCreds(
120122
credentialMap.put(StorageAccessProperty.CLIENT_REGION, storageConfig.getRegion());
121123
}
122124

125+
URI endpointUri = storageConfig.getEndpointUri();
126+
if (endpointUri != null) {
127+
credentialMap.put(StorageAccessProperty.AWS_ENDPOINT, endpointUri.toString());
128+
}
129+
123130
if (storageConfig.getAwsPartition().equals("aws-us-gov")
124131
&& credentialMap.get(StorageAccessProperty.CLIENT_REGION) == null) {
125132
throw new IllegalArgumentException(
126133
String.format(
127134
"AWS region must be set when using partition %s", storageConfig.getAwsPartition()));
128135
}
129136

130-
if (!awsCredentialsAdditionalProperties.isEmpty()) {
131-
awsCredentialsAdditionalProperties.forEach(credentialMap::putIfAbsent);
132-
}
133-
134137
return credentialMap;
135138
}
136139

polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsStorageConfigurationInfo.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.common.base.MoreObjects;
2525
import jakarta.annotation.Nonnull;
2626
import jakarta.annotation.Nullable;
27+
import java.net.URI;
2728
import java.util.List;
2829
import java.util.regex.Matcher;
2930
import java.util.regex.Pattern;
@@ -53,14 +54,38 @@ public class AwsStorageConfigurationInfo extends PolarisStorageConfigurationInfo
5354
@JsonProperty(value = "region")
5455
private @Nullable String region = null;
5556

57+
/** Endpoint URI for S3 API calls */
58+
@JsonProperty(value = "endpoint")
59+
private @Nullable String endpoint;
60+
61+
/** Endpoint URI for STS API calls */
62+
@JsonProperty(value = "stsEndpoint")
63+
private @Nullable String stsEndpoint;
64+
5665
@JsonCreator
5766
public AwsStorageConfigurationInfo(
5867
@JsonProperty(value = "storageType", required = true) @Nonnull StorageType storageType,
5968
@JsonProperty(value = "allowedLocations", required = true) @Nonnull
6069
List<String> allowedLocations,
6170
@JsonProperty(value = "roleARN", required = true) @Nonnull String roleARN,
62-
@JsonProperty(value = "region", required = false) @Nullable String region) {
63-
this(storageType, allowedLocations, roleARN, null, region);
71+
@JsonProperty(value = "externalId") @Nullable String externalId,
72+
@JsonProperty(value = "region", required = false) @Nullable String region,
73+
@JsonProperty(value = "endpoint") @Nullable String endpoint,
74+
@JsonProperty(value = "stsEndpoint") @Nullable String stsEndpoint) {
75+
super(storageType, allowedLocations);
76+
this.roleARN = roleARN;
77+
this.externalId = externalId;
78+
this.region = region;
79+
this.endpoint = endpoint;
80+
this.stsEndpoint = stsEndpoint;
81+
}
82+
83+
public AwsStorageConfigurationInfo(
84+
@Nonnull StorageType storageType,
85+
@Nonnull List<String> allowedLocations,
86+
@Nonnull String roleARN,
87+
@Nullable String region) {
88+
this(storageType, allowedLocations, roleARN, null, region, null, null);
6489
}
6590

6691
public AwsStorageConfigurationInfo(
@@ -69,10 +94,7 @@ public AwsStorageConfigurationInfo(
6994
@Nonnull String roleARN,
7095
@Nullable String externalId,
7196
@Nullable String region) {
72-
super(storageType, allowedLocations);
73-
this.roleARN = roleARN;
74-
this.externalId = externalId;
75-
this.region = region;
97+
this(storageType, allowedLocations, roleARN, externalId, region, null, null);
7698
}
7799

78100
@Override
@@ -121,6 +143,19 @@ public void setRegion(@Nullable String region) {
121143
this.region = region;
122144
}
123145

146+
@JsonIgnore
147+
@Nullable
148+
public URI getEndpointUri() {
149+
return endpoint == null ? null : URI.create(endpoint);
150+
}
151+
152+
/** Returns the STS endpoint if set, defaulting to {@link #getEndpointUri()} otherwise. */
153+
@JsonIgnore
154+
@Nullable
155+
public URI getStsEndpointUri() {
156+
return stsEndpoint == null ? getEndpointUri() : URI.create(stsEndpoint);
157+
}
158+
124159
@JsonIgnore
125160
public String getAwsAccountId() {
126161
return parseAwsAccountId(roleARN);
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.core.storage.aws;
21+
22+
import jakarta.annotation.Nullable;
23+
import java.net.URI;
24+
import java.util.Optional;
25+
import org.apache.polaris.immutables.PolarisImmutable;
26+
import org.immutables.value.Value;
27+
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
28+
import software.amazon.awssdk.regions.Region;
29+
import software.amazon.awssdk.services.sts.StsBaseClientBuilder;
30+
import software.amazon.awssdk.services.sts.StsClient;
31+
import software.amazon.awssdk.services.sts.endpoints.StsEndpointProvider;
32+
33+
public interface StsClientProvider {
34+
35+
/**
36+
* Returns an STS client for the given destination (endpoint + region). The returned client may
37+
* not be a fresh instance for every call, however the client is reusable for multiple concurrent
38+
* requests from multiple threads. If the endpoint or region parameters are not specified, AWS SDK
39+
* defaults will be used.
40+
*
41+
* @param destination Endpoint and Region data for the client. Both values are optional.
42+
*/
43+
StsClient stsClient(StsDestination destination);
44+
45+
@PolarisImmutable
46+
interface StsDestination {
47+
/** Corresponds to {@link StsBaseClientBuilder#endpointProvider(StsEndpointProvider)} */
48+
@Value.Parameter(order = 1)
49+
Optional<URI> endpoint();
50+
51+
/** Corresponds to {@link AwsClientBuilder#region(Region)} */
52+
@Value.Parameter(order = 2)
53+
Optional<String> region();
54+
55+
static StsDestination of(@Nullable URI endpoint, @Nullable String region) {
56+
return ImmutableStsDestination.of(Optional.ofNullable(endpoint), Optional.ofNullable(region));
57+
}
58+
}
59+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.core.storage.aws;
21+
22+
import static org.apache.polaris.core.storage.PolarisStorageConfigurationInfo.StorageType.S3;
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
25+
import java.net.URI;
26+
import java.util.List;
27+
import org.junit.jupiter.api.Test;
28+
29+
public class AwsStorageConfigurationInfoTest {
30+
31+
private static AwsStorageConfigurationInfo config(String endpoint, String stsEndpoint) {
32+
return new AwsStorageConfigurationInfo(
33+
S3, List.of(), "role", null, null, endpoint, stsEndpoint);
34+
}
35+
36+
@Test
37+
public void testStsEndpoint() {
38+
assertThat(config(null, null))
39+
.extracting(
40+
AwsStorageConfigurationInfo::getEndpointUri,
41+
AwsStorageConfigurationInfo::getStsEndpointUri)
42+
.containsExactly(null, null);
43+
assertThat(config(null, "http://sts.example.com"))
44+
.extracting(
45+
AwsStorageConfigurationInfo::getEndpointUri,
46+
AwsStorageConfigurationInfo::getStsEndpointUri)
47+
.containsExactly(null, URI.create("http://sts.example.com"));
48+
assertThat(config("http://s3.example.com", null))
49+
.extracting(
50+
AwsStorageConfigurationInfo::getEndpointUri,
51+
AwsStorageConfigurationInfo::getStsEndpointUri)
52+
.containsExactly(URI.create("http://s3.example.com"), URI.create("http://s3.example.com"));
53+
assertThat(config("http://s3.example.com", "http://sts.example.com"))
54+
.extracting(
55+
AwsStorageConfigurationInfo::getEndpointUri,
56+
AwsStorageConfigurationInfo::getStsEndpointUri)
57+
.containsExactly(URI.create("http://s3.example.com"), URI.create("http://sts.example.com"));
58+
}
59+
}

runtime/service/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ dependencies {
8484
implementation("software.amazon.awssdk:sts")
8585
implementation("software.amazon.awssdk:iam-policy-builder")
8686
implementation("software.amazon.awssdk:s3")
87+
implementation("software.amazon.awssdk:apache-client") {
88+
exclude("commons-logging", "commons-logging")
89+
}
8790
implementation(platform(libs.azuresdk.bom))
8891
implementation("com.azure:azure-core")
8992

@@ -106,6 +109,8 @@ dependencies {
106109
testImplementation(project(":polaris-api-management-model"))
107110
testImplementation(testFixtures(project(":polaris-service-common")))
108111

112+
testImplementation(project(":polaris-minio-testcontainer"))
113+
109114
testImplementation("org.apache.iceberg:iceberg-api:${libs.versions.iceberg.get()}:tests")
110115
testImplementation("org.apache.iceberg:iceberg-core:${libs.versions.iceberg.get()}:tests")
111116

0 commit comments

Comments
 (0)