Skip to content

Commit 5ca3fdc

Browse files
authored
Always propagate non-credential properties from AccessConfig to clients (apache#2615)
* Always propagate non-credential properties from AccessConfig to clients This change builds on top of apache#2589 and further prepares Polaris code to support non-STS S3 implementations for apache#2589. For S3 implementations that do have STS, this change enables clients to run with local credentials (no credential vending) and still receive endpoint configuration from the catalog. * Call `SupportsCredentialDelegation.getAccessConfig()` on all relevant create/load requests (previously it was called only when `vended-credentials` was requested * Always sent `AccessConfig.extraProperties()` to clients * Expose credentials to clients only when the `vended-credentials` access delegation mode is requested. * There is not client-visible behaviour change for implementations of `PolarisStorageIntegration` that do not produce "extra" `AccessConfig` properties.
1 parent e6796f7 commit 5ca3fdc

File tree

3 files changed

+96
-34
lines changed

3 files changed

+96
-34
lines changed

runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java

Lines changed: 73 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@
1919
package org.apache.polaris.service.it;
2020

2121
import static java.nio.charset.StandardCharsets.UTF_8;
22+
import static org.apache.iceberg.aws.AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT;
23+
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ACCESS_KEY_ID;
24+
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
25+
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;
2226
import static org.apache.iceberg.types.Types.NestedField.optional;
2327
import static org.apache.iceberg.types.Types.NestedField.required;
28+
import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;
2429
import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
2530
import static org.assertj.core.api.Assertions.assertThat;
2631

@@ -42,7 +47,6 @@
4247
import org.apache.iceberg.Schema;
4348
import org.apache.iceberg.Table;
4449
import org.apache.iceberg.TableOperations;
45-
import org.apache.iceberg.aws.AwsClientProperties;
4650
import org.apache.iceberg.catalog.TableIdentifier;
4751
import org.apache.iceberg.io.FileIO;
4852
import org.apache.iceberg.io.OutputFile;
@@ -57,6 +61,7 @@
5761
import org.apache.polaris.core.admin.model.PolarisCatalog;
5862
import org.apache.polaris.core.admin.model.PrincipalWithCredentials;
5963
import org.apache.polaris.core.admin.model.StorageConfigInfo;
64+
import org.apache.polaris.service.catalog.AccessDelegationMode;
6065
import org.apache.polaris.service.it.env.CatalogApi;
6166
import org.apache.polaris.service.it.env.ClientCredentials;
6267
import org.apache.polaris.service.it.env.ManagementApi;
@@ -74,6 +79,7 @@
7479
import org.junit.jupiter.api.TestInfo;
7580
import org.junit.jupiter.api.extension.ExtendWith;
7681
import org.junit.jupiter.params.ParameterizedTest;
82+
import org.junit.jupiter.params.provider.CsvSource;
7783
import org.junit.jupiter.params.provider.ValueSource;
7884
import software.amazon.awssdk.services.s3.S3Client;
7985
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@@ -112,7 +118,6 @@ public Map<String, String> getConfigOverrides() {
112118
required(1, "id", Types.IntegerType.get(), "doc"),
113119
optional(2, "data", Types.StringType.get()));
114120

115-
private static ClientCredentials adminCredentials;
116121
private static PolarisApiEndpoints endpoints;
117122
private static PolarisClient client;
118123
private static ManagementApi managementApi;
@@ -131,7 +136,6 @@ static void setup(
131136
@Minio(accessKey = MINIO_ACCESS_KEY, secretKey = MINIO_SECRET_KEY) MinioAccess minioAccess,
132137
ClientCredentials credentials) {
133138
s3Client = minioAccess.s3Client();
134-
adminCredentials = credentials;
135139
endpoints = apiEndpoints;
136140
client = polarisClient(endpoints);
137141
adminToken = client.obtainToken(credentials);
@@ -158,15 +162,19 @@ public void before(TestInfo testInfo) {
158162
}
159163

160164
private RESTCatalog createCatalog(
161-
Optional<String> endpoint, Optional<String> stsEndpoint, boolean pathStyleAccess) {
162-
return createCatalog(endpoint, stsEndpoint, pathStyleAccess, Optional.empty());
165+
Optional<String> endpoint,
166+
Optional<String> stsEndpoint,
167+
boolean pathStyleAccess,
168+
Optional<AccessDelegationMode> delegationMode) {
169+
return createCatalog(endpoint, stsEndpoint, pathStyleAccess, Optional.empty(), delegationMode);
163170
}
164171

165172
private RESTCatalog createCatalog(
166173
Optional<String> endpoint,
167174
Optional<String> stsEndpoint,
168175
boolean pathStyleAccess,
169-
Optional<String> endpointInternal) {
176+
Optional<String> endpointInternal,
177+
Optional<AccessDelegationMode> delegationMode) {
170178
AwsStorageConfigInfo.Builder storageConfig =
171179
AwsStorageConfigInfo.builder()
172180
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
@@ -198,8 +206,16 @@ private RESTCatalog createCatalog(
198206
org.apache.iceberg.CatalogProperties.URI, endpoints.catalogApiEndpoint().toString())
199207
.put(OAuth2Properties.TOKEN, authToken)
200208
.put("warehouse", catalogName)
201-
.putAll(endpoints.extraHeaders("header."))
202-
.put("header.X-Iceberg-Access-Delegation", "vended-credentials");
209+
.putAll(endpoints.extraHeaders("header."));
210+
211+
delegationMode.ifPresent(
212+
dm -> propertiesBuilder.put("header.X-Iceberg-Access-Delegation", dm.protocolValue()));
213+
214+
if (delegationMode.isEmpty()) {
215+
// Use local credentials on the client side
216+
propertiesBuilder.put("s3.access-key-id", MINIO_ACCESS_KEY);
217+
propertiesBuilder.put("s3.secret-access-key", MINIO_SECRET_KEY);
218+
}
203219

204220
restCatalog.initialize("polaris", propertiesBuilder.buildKeepingLast());
205221
return restCatalog;
@@ -213,13 +229,34 @@ public void cleanUp() {
213229
@ParameterizedTest
214230
@ValueSource(booleans = {true, false})
215231
public void testCreateTable(boolean pathStyle) throws IOException {
232+
LoadTableResponse response = doTestCreateTable(pathStyle, Optional.empty());
233+
assertThat(response.config()).doesNotContainKey(SECRET_ACCESS_KEY);
234+
assertThat(response.config()).doesNotContainKey(ACCESS_KEY_ID);
235+
assertThat(response.config()).doesNotContainKey(REFRESH_CREDENTIALS_ENDPOINT);
236+
assertThat(response.credentials()).isEmpty();
237+
}
238+
239+
@ParameterizedTest
240+
@ValueSource(booleans = {true, false})
241+
public void testCreateTableVendedCredentials(boolean pathStyle) throws IOException {
242+
LoadTableResponse response = doTestCreateTable(pathStyle, Optional.of(VENDED_CREDENTIALS));
243+
assertThat(response.config())
244+
.containsEntry(
245+
REFRESH_CREDENTIALS_ENDPOINT,
246+
"v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials");
247+
assertThat(response.credentials()).hasSize(1);
248+
}
249+
250+
private LoadTableResponse doTestCreateTable(boolean pathStyle, Optional<AccessDelegationMode> dm)
251+
throws IOException {
216252
try (RESTCatalog restCatalog =
217-
createCatalog(Optional.of(endpoint), Optional.empty(), pathStyle)) {
218-
LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog);
253+
createCatalog(Optional.of(endpoint), Optional.empty(), pathStyle, dm)) {
254+
LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog, dm);
219255
if (pathStyle) {
220256
assertThat(loadTableResponse.config())
221257
.containsEntry("s3.path-style-access", Boolean.TRUE.toString());
222258
}
259+
return loadTableResponse;
223260
}
224261
}
225262

@@ -230,7 +267,8 @@ public void testInternalEndpoints() throws IOException {
230267
Optional.of("http://s3.example.com"),
231268
Optional.of(endpoint),
232269
false,
233-
Optional.of(endpoint))) {
270+
Optional.of(endpoint),
271+
Optional.empty())) {
234272
StorageConfigInfo storageConfig =
235273
managementApi.getCatalog(catalogName).getStorageConfigInfo();
236274
assertThat((AwsStorageConfigInfo) storageConfig)
@@ -240,12 +278,13 @@ public void testInternalEndpoints() throws IOException {
240278
AwsStorageConfigInfo::getEndpointInternal,
241279
AwsStorageConfigInfo::getPathStyleAccess)
242280
.containsExactly("http://s3.example.com", endpoint, endpoint, false);
243-
LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog);
244-
assertThat(loadTableResponse.config()).containsEntry("s3.endpoint", "http://s3.example.com");
281+
LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog, Optional.empty());
282+
assertThat(loadTableResponse.config()).containsEntry(ENDPOINT, "http://s3.example.com");
245283
}
246284
}
247285

248-
public LoadTableResponse doTestCreateTable(RESTCatalog restCatalog) throws IOException {
286+
public LoadTableResponse doTestCreateTable(
287+
RESTCatalog restCatalog, Optional<AccessDelegationMode> dm) {
249288
catalogApi.createNamespace(catalogName, "test-ns");
250289
TableIdentifier id = TableIdentifier.of("test-ns", "t1");
251290
Table table = restCatalog.createTable(id, SCHEMA);
@@ -266,23 +305,32 @@ public LoadTableResponse doTestCreateTable(RESTCatalog restCatalog) throws IOExc
266305
assertThat(response.contentLength()).isGreaterThan(0);
267306

268307
LoadTableResponse loadTableResponse =
269-
catalogApi.loadTableWithAccessDelegation(catalogName, id, "ALL");
270-
assertThat(loadTableResponse.config())
271-
.containsKey("s3.endpoint")
272-
.containsEntry(
273-
AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT,
274-
"v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials");
308+
catalogApi.loadTable(
309+
catalogName,
310+
id,
311+
"ALL",
312+
dm.map(v -> Map.of("X-Iceberg-Access-Delegation", v.protocolValue())).orElse(Map.of()));
313+
314+
assertThat(loadTableResponse.config()).containsKey(ENDPOINT);
275315

276316
restCatalog.dropTable(id);
277317
assertThat(restCatalog.tableExists(id)).isFalse();
278318
return loadTableResponse;
279319
}
280320

281321
@ParameterizedTest
282-
@ValueSource(booleans = {true, false})
283-
public void testAppendFiles(boolean pathStyle) throws IOException {
322+
@CsvSource("true,")
323+
@CsvSource("false,")
324+
@CsvSource("true,VENDED_CREDENTIALS")
325+
@CsvSource("false,VENDED_CREDENTIALS")
326+
public void testAppendFiles(boolean pathStyle, AccessDelegationMode delegationMode)
327+
throws IOException {
284328
try (RESTCatalog restCatalog =
285-
createCatalog(Optional.of(endpoint), Optional.of(endpoint), pathStyle)) {
329+
createCatalog(
330+
Optional.of(endpoint),
331+
Optional.of(endpoint),
332+
pathStyle,
333+
Optional.ofNullable(delegationMode))) {
286334
catalogApi.createNamespace(catalogName, "test-ns");
287335
TableIdentifier id = TableIdentifier.of("test-ns", "t1");
288336
Table table = restCatalog.createTable(id, SCHEMA);
@@ -295,7 +343,8 @@ public void testAppendFiles(boolean pathStyle) throws IOException {
295343
URI.create(
296344
table
297345
.locationProvider()
298-
.newDataLocation(String.format("test-file-%s.txt", pathStyle)));
346+
.newDataLocation(
347+
String.format("test-file-%s-%s.txt", pathStyle, delegationMode)));
299348
OutputFile f1 = io.newOutputFile(loc.toString());
300349
try (PositionOutputStream os = f1.create()) {
301350
os.write("Hello World".getBytes(UTF_8));

runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -794,10 +794,6 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential
794794
LoadTableResponse.Builder responseBuilder =
795795
LoadTableResponse.builder().withTableMetadata(tableMetadata);
796796

797-
if (!delegationModes.contains(VENDED_CREDENTIALS)) {
798-
return responseBuilder;
799-
}
800-
801797
if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) {
802798
LOGGER
803799
.atDebug()
@@ -808,15 +804,15 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential
808804
credentialDelegation.getAccessConfig(
809805
tableIdentifier, tableMetadata, actions, refreshCredentialsEndpoint);
810806
Map<String, String> credentialConfig = accessConfig.credentials();
811-
responseBuilder.addAllConfig(credentialConfig);
812-
responseBuilder.addAllConfig(accessConfig.extraProperties());
813-
if (!credentialConfig.isEmpty()) {
807+
if (!credentialConfig.isEmpty() && delegationModes.contains(VENDED_CREDENTIALS)) {
808+
responseBuilder.addAllConfig(credentialConfig);
814809
responseBuilder.addCredential(
815810
ImmutableCredential.builder()
816811
.prefix(tableMetadata.location())
817812
.config(credentialConfig)
818813
.build());
819814
}
815+
responseBuilder.addAllConfig(accessConfig.extraProperties());
820816
}
821817
return responseBuilder;
822818
}

runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
package org.apache.polaris.service;
2020

21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
23+
2124
import com.google.auth.oauth2.AccessToken;
2225
import com.google.auth.oauth2.GoogleCredentials;
2326
import jakarta.annotation.Nonnull;
@@ -77,6 +80,9 @@
7780
import org.apache.polaris.service.task.TaskExecutor;
7881
import org.mockito.Mockito;
7982
import software.amazon.awssdk.services.sts.StsClient;
83+
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
84+
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
85+
import software.amazon.awssdk.services.sts.model.Credentials;
8086

8187
public record TestServices(
8288
Clock clock,
@@ -129,10 +135,21 @@ public static class Builder {
129135
private PolarisDiagnostics diagnostics = new PolarisDefaultDiagServiceImpl();
130136
private RealmContext realmContext = TEST_REALM;
131137
private Map<String, Object> config = Map.of();
132-
private StsClient stsClient = Mockito.mock(StsClient.class);
138+
private StsClient stsClient;
133139
private FileIOFactorySupplier fileIOFactorySupplier = MeasuredFileIOFactory::new;
134140

135-
private Builder() {}
141+
private Builder() {
142+
stsClient = Mockito.mock(StsClient.class, RETURNS_DEEP_STUBS);
143+
AssumeRoleResponse arr = Mockito.mock(AssumeRoleResponse.class, RETURNS_DEEP_STUBS);
144+
Mockito.when(stsClient.assumeRole(any(AssumeRoleRequest.class))).thenReturn(arr);
145+
Mockito.when(arr.credentials())
146+
.thenReturn(
147+
Credentials.builder()
148+
.accessKeyId("test-access-key-id-111")
149+
.secretAccessKey("test-secret-access-key-222")
150+
.sessionToken("test-session-token-333")
151+
.build());
152+
}
136153

137154
public Builder realmContext(RealmContext realmContext) {
138155
this.realmContext = realmContext;
@@ -222,7 +239,7 @@ public TestServices build() {
222239

223240
@SuppressWarnings("unchecked")
224241
Instance<ExternalCatalogFactory> externalCatalogFactory = Mockito.mock(Instance.class);
225-
Mockito.when(externalCatalogFactory.select(Mockito.any())).thenReturn(externalCatalogFactory);
242+
Mockito.when(externalCatalogFactory.select(any())).thenReturn(externalCatalogFactory);
226243
Mockito.when(externalCatalogFactory.isUnsatisfied()).thenReturn(true);
227244

228245
IcebergCatalogAdapter catalogService =

0 commit comments

Comments
 (0)