Skip to content

Commit 04a9950

Browse files
authored
[Catalog Federation] Enable Credential Vending for Passthrough Facade Catalog (#2784)
This PR introduces credential vending support for passthrough-facade catalogs. When creating a passthrough-facade catalog, the configuration currently requires two components: StorageConfig – specifies the storage info for the remote catalog. ConnectionInfo – defines connection parameters for the underlying remote catalog. With this change, the StorageConfig is now also used to vend temporary credentials for user requests. Credential vending honors table-level RBAC policies to determine whether to issue read-only or read-write credentials, ensuring access control consistency with Polaris authorization semantics. A new test case validates the credential vending workflow, verifying both read and write credential vending. Note: the remote catalog referenced by the passthrough-facade does not need to support IRC
1 parent 622031a commit 04a9950

File tree

6 files changed

+178
-50
lines changed

6 files changed

+178
-50
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
3535

3636
### New Features
3737

38+
- Support credential vending for federated catalogs. `ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING` (default: true) was added to toggle this feature.
39+
3840
### Changes
3941

4042
### Deprecations

integration-tests/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ dependencies {
6767
implementation(libs.awaitility)
6868
implementation(libs.s3mock.testcontainers)
6969
implementation(project(":polaris-runtime-test-common"))
70+
implementation(project(":polaris-minio-testcontainer"))
7071
}
7172

7273
copiedCodeChecks {

integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java

Lines changed: 101 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727
import java.util.UUID;
2828
import org.apache.iceberg.exceptions.ForbiddenException;
2929
import org.apache.polaris.core.admin.model.AuthenticationParameters;
30+
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
3031
import org.apache.polaris.core.admin.model.Catalog;
3132
import org.apache.polaris.core.admin.model.CatalogGrant;
3233
import org.apache.polaris.core.admin.model.CatalogPrivilege;
3334
import org.apache.polaris.core.admin.model.CatalogProperties;
3435
import org.apache.polaris.core.admin.model.CatalogRole;
3536
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
3637
import org.apache.polaris.core.admin.model.ExternalCatalog;
37-
import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
3838
import org.apache.polaris.core.admin.model.GrantResource;
3939
import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
4040
import org.apache.polaris.core.admin.model.NamespaceGrant;
@@ -52,6 +52,9 @@
5252
import org.apache.polaris.service.it.env.PolarisClient;
5353
import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
5454
import org.apache.polaris.service.it.ext.SparkSessionBuilder;
55+
import org.apache.polaris.test.minio.Minio;
56+
import org.apache.polaris.test.minio.MinioAccess;
57+
import org.apache.polaris.test.minio.MinioExtension;
5558
import org.apache.spark.sql.Row;
5659
import org.apache.spark.sql.SparkSession;
5760
import org.junit.jupiter.api.AfterAll;
@@ -66,9 +69,14 @@
6669
* Integration test for catalog federation functionality. This test verifies that an external
6770
* catalog can be created that federates with an internal catalog.
6871
*/
72+
@ExtendWith(MinioExtension.class)
6973
@ExtendWith(PolarisIntegrationTestExtension.class)
7074
public class CatalogFederationIntegrationTest {
7175

76+
public static final String BUCKET_URI_PREFIX = "/minio-test-catalog-federation";
77+
public static final String MINIO_ACCESS_KEY = "test-ak-123-catalog-federation";
78+
public static final String MINIO_SECRET_KEY = "test-sk-123-catalog-federation";
79+
7280
private static PolarisClient client;
7381
private static CatalogApi catalogApi;
7482
private static ManagementApi managementApi;
@@ -78,6 +86,8 @@ public class CatalogFederationIntegrationTest {
7886
private static String federatedCatalogName;
7987
private static String localCatalogRoleName;
8088
private static String federatedCatalogRoleName;
89+
private static URI storageBase;
90+
private static String endpoint;
8191

8292
private static final String PRINCIPAL_NAME = "test-catalog-federation-user";
8393
private static final String PRINCIPAL_ROLE_NAME = "test-catalog-federation-user-role";
@@ -93,12 +103,17 @@ public class CatalogFederationIntegrationTest {
93103
private PrincipalWithCredentials newUserCredentials;
94104

95105
@BeforeAll
96-
static void setup(PolarisApiEndpoints apiEndpoints, ClientCredentials credentials) {
106+
static void setup(
107+
PolarisApiEndpoints apiEndpoints,
108+
ClientCredentials credentials,
109+
@Minio(accessKey = MINIO_ACCESS_KEY, secretKey = MINIO_SECRET_KEY) MinioAccess minioAccess) {
97110
endpoints = apiEndpoints;
98111
client = polarisClient(endpoints);
99112
String adminToken = client.obtainToken(credentials);
100113
managementApi = client.managementApi(adminToken);
101114
catalogApi = client.catalogApi(adminToken);
115+
storageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX);
116+
endpoint = minioAccess.s3endpoint();
102117
}
103118

104119
@AfterAll
@@ -129,12 +144,14 @@ void after() {
129144
}
130145

131146
private void setupCatalogs() {
132-
baseLocation = URI.create("file:///tmp/warehouse");
147+
baseLocation = storageBase;
133148
newUserCredentials = managementApi.createPrincipalWithRole(PRINCIPAL_NAME, PRINCIPAL_ROLE_NAME);
134149

135-
FileStorageConfigInfo storageConfig =
136-
FileStorageConfigInfo.builder()
137-
.setStorageType(StorageConfigInfo.StorageTypeEnum.FILE)
150+
AwsStorageConfigInfo storageConfig =
151+
AwsStorageConfigInfo.builder()
152+
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
153+
.setPathStyleAccess(true)
154+
.setEndpoint(endpoint)
138155
.setAllowedLocations(List.of(baseLocation.toString()))
139156
.build();
140157

@@ -197,6 +214,14 @@ private void setupCatalogs() {
197214
spark =
198215
SparkSessionBuilder.buildWithTestDefaults()
199216
.withWarehouse(warehouseDir.toUri())
217+
.withConfig(
218+
"spark.sql.catalog." + localCatalogName + ".header.X-Iceberg-Access-Delegation",
219+
"vended-credentials")
220+
.withConfig(
221+
"spark.sql.catalog." + federatedCatalogName + ".header.X-Iceberg-Access-Delegation",
222+
"vended-credentials")
223+
.withConfig("spark.sql.catalog." + localCatalogName + ".cache-enabled", "false")
224+
.withConfig("spark.sql.catalog." + federatedCatalogName + ".cache-enabled", "false")
200225
.addCatalog(
201226
localCatalogName, "org.apache.iceberg.spark.SparkCatalog", endpoints, sparkToken)
202227
.addCatalog(
@@ -296,10 +321,6 @@ void testFederatedCatalogWithNamespaceRBAC() {
296321
.sql("SELECT * FROM " + localCatalogName + ".ns2.test_table ORDER BY id")
297322
.collectAsList();
298323
assertThat(localNs2Data).hasSize(2);
299-
300-
// Restore the grant
301-
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, namespaceGrant);
302-
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);
303324
}
304325

305326
@Test
@@ -335,9 +356,76 @@ void testFederatedCatalogWithTableRBAC() {
335356
.sql("SELECT * FROM " + localCatalogName + ".ns2.test_table ORDER BY id")
336357
.collectAsList();
337358
assertThat(localNs2Data).hasSize(2);
359+
}
338360

339-
// Restore the grant
340-
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant);
341-
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);
361+
@Test
362+
void testFederatedCatalogWithCredentialVending() {
363+
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);
364+
365+
// Case 1: Only have TABLE_READ_PROPERTIES privilege, should not be able to read data
366+
TableGrant tablePropertiesGrant =
367+
TableGrant.builder()
368+
.setType(GrantResource.TypeEnum.TABLE)
369+
.setPrivilege(TablePrivilege.TABLE_READ_PROPERTIES)
370+
.setNamespace(List.of("ns1"))
371+
.setTableName("test_table")
372+
.build();
373+
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tablePropertiesGrant);
374+
spark.sql("USE " + federatedCatalogName);
375+
376+
// Read table data should fail since TABLE_READ_PROPERTIES does not allow reading data
377+
assertThatThrownBy(() -> spark.sql("SELECT * FROM ns1.test_table ORDER BY id"))
378+
.isInstanceOf(ForbiddenException.class);
379+
380+
// Case 2: Only have TABLE_READ_DATA privilege, should be able to read data but not write
381+
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tablePropertiesGrant);
382+
TableGrant tableReadDataGrant =
383+
TableGrant.builder()
384+
.setType(GrantResource.TypeEnum.TABLE)
385+
.setPrivilege(TablePrivilege.TABLE_READ_DATA)
386+
.setNamespace(List.of("ns1"))
387+
.setTableName("test_table")
388+
.build();
389+
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableReadDataGrant);
390+
391+
// Verify that the vended credential allows reading the data
392+
List<Row> ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList();
393+
assertThat(ns1Data).hasSize(2);
394+
assertThat(ns1Data.get(0).getInt(0)).isEqualTo(1);
395+
assertThat(ns1Data.get(0).getString(1)).isEqualTo("Alice");
396+
397+
// Verify that write is blocked since the vended credential should only have read permission
398+
assertThatThrownBy(() -> spark.sql("INSERT INTO ns1.test_table VALUES (3, 'Charlie')"))
399+
.hasMessageContaining(
400+
"software.amazon.awssdk.services.s3.model.S3Exception: Access Denied. (Service: S3, Status Code: 403,");
401+
402+
// Case 3: TABLE_WRITE_DATA should
403+
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableReadDataGrant);
404+
TableGrant tableWriteDataGrant =
405+
TableGrant.builder()
406+
.setType(GrantResource.TypeEnum.TABLE)
407+
.setPrivilege(TablePrivilege.TABLE_WRITE_DATA)
408+
.setNamespace(List.of("ns1"))
409+
.setTableName("test_table")
410+
.build();
411+
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableWriteDataGrant);
412+
413+
spark.sql("INSERT INTO ns1.test_table VALUES (3, 'Charlie')");
414+
415+
// Verify the write was successful by reading back
416+
List<Row> updatedData = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList();
417+
assertThat(updatedData).hasSize(3);
418+
assertThat(updatedData.get(2).getInt(0)).isEqualTo(3);
419+
assertThat(updatedData.get(2).getString(1)).isEqualTo("Charlie");
420+
421+
// Verify the data is also visible from the local catalog (both point to same storage)
422+
spark.sql(String.format("REFRESH TABLE %s.ns1.test_table", localCatalogName));
423+
List<Row> localData =
424+
spark
425+
.sql(String.format("SELECT * FROM %s.ns1.test_table ORDER BY id", localCatalogName))
426+
.collectAsList();
427+
assertThat(localData).hasSize(3);
428+
assertThat(localData.get(2).getInt(0)).isEqualTo(3);
429+
assertThat(localData.get(2).getString(1)).isEqualTo("Charlie");
342430
}
343431
}

polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,4 +429,13 @@ public static void enforceFeatureEnabledOrThrow(
429429
"When true, enables finer grained update table privileges which are passed to the authorizer for update table operations")
430430
.defaultValue(true)
431431
.buildFeatureConfiguration();
432+
433+
public static final FeatureConfiguration<Boolean> ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING =
434+
PolarisConfiguration.<Boolean>builder()
435+
.key("ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING")
436+
.catalogConfig("polaris.config.allow-federated-catalogs-credential-vending")
437+
.description(
438+
"If set to true (default), allow credential vending for external catalogs. Note this requires ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING to be true first.")
439+
.defaultValue(true)
440+
.buildFeatureConfiguration();
432441
}

runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.polaris.service.catalog.iceberg;
2020

21+
import static org.apache.polaris.core.config.FeatureConfiguration.ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING;
2122
import static org.apache.polaris.core.config.FeatureConfiguration.LIST_PAGINATION_ENABLED;
2223
import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;
2324

@@ -265,6 +266,10 @@ protected void initializeCatalog() {
265266
throw new UnsupportedOperationException(
266267
"External catalog factory for type '" + connectionType + "' is unavailable.");
267268
}
269+
// TODO: if the remote catalog is not RestCatalog, the corresponding table operation will use
270+
// environment to load the table metadata, the env may not contain credentials to access the
271+
// storage. In the future, we could leverage PolarisCredentialManager to inject storage
272+
// credentials for non-rest remote catalog
268273
this.baseCatalog = federatedCatalog;
269274
} else {
270275
LOGGER.atInfo().log("Initializing non-federated catalog");
@@ -421,6 +426,12 @@ public void authorizeCreateTableDirect(
421426
PolarisAuthorizableOperation.CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION,
422427
TableIdentifier.of(namespace, request.name()));
423428
}
429+
430+
CatalogEntity catalog = getResolvedCatalogEntity();
431+
if (catalog.isStaticFacade()) {
432+
throw new BadRequestException("Cannot create table on static-facade external catalogs.");
433+
}
434+
checkAllowExternalCatalogCredentialVending(delegationModes);
424435
}
425436

426437
public LoadTableResponse createTableDirect(
@@ -431,10 +442,6 @@ public LoadTableResponse createTableDirect(
431442

432443
authorizeCreateTableDirect(namespace, request, delegationModes);
433444

434-
CatalogEntity catalog = getResolvedCatalogEntity();
435-
if (catalog.isStaticFacade()) {
436-
throw new BadRequestException("Cannot create table on static-facade external catalogs.");
437-
}
438445
request.validate();
439446

440447
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, request.name());
@@ -545,6 +552,12 @@ private void authorizeCreateTableStaged(
545552
PolarisAuthorizableOperation.CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION,
546553
TableIdentifier.of(namespace, request.name()));
547554
}
555+
556+
CatalogEntity catalog = getResolvedCatalogEntity();
557+
if (catalog.isStaticFacade()) {
558+
throw new BadRequestException("Cannot create table on static-facade external catalogs.");
559+
}
560+
checkAllowExternalCatalogCredentialVending(delegationModes);
548561
}
549562

550563
public LoadTableResponse createTableStaged(
@@ -555,10 +568,6 @@ public LoadTableResponse createTableStaged(
555568

556569
authorizeCreateTableStaged(namespace, request, delegationModes);
557570

558-
CatalogEntity catalog = getResolvedCatalogEntity();
559-
if (catalog.isStaticFacade()) {
560-
throw new BadRequestException("Cannot create table on static-facade external catalogs.");
561-
}
562571
TableIdentifier ident = TableIdentifier.of(namespace, request.name());
563572
TableMetadata metadata = stageTableCreateHelper(namespace, request);
564573

@@ -723,23 +732,7 @@ private Set<PolarisStorageActions> authorizeLoadTable(
723732
read, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier);
724733
}
725734

726-
CatalogEntity catalogEntity = getResolvedCatalogEntity();
727-
728-
LOGGER.info("Catalog type: {}", catalogEntity.getCatalogType());
729-
LOGGER.info(
730-
"allow external catalog credential vending: {}",
731-
realmConfig.getConfig(
732-
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity));
733-
if (catalogEntity
734-
.getCatalogType()
735-
.equals(org.apache.polaris.core.admin.model.Catalog.TypeEnum.EXTERNAL)
736-
&& !realmConfig.getConfig(
737-
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)) {
738-
throw new ForbiddenException(
739-
"Access Delegation is not enabled for this catalog. Please consult applicable "
740-
+ "documentation for the catalog config property '%s' to enable this feature",
741-
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING.catalogConfig());
742-
}
735+
checkAllowExternalCatalogCredentialVending(delegationModes);
743736

744737
return actionsRequested;
745738
}
@@ -808,8 +801,15 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential
808801
PolarisResolvedPathWrapper resolvedStoragePath =
809802
CatalogUtils.findResolvedStorageEntity(resolutionManifest, tableIdentifier);
810803

811-
if (baseCatalog instanceof IcebergCatalog && resolvedStoragePath != null) {
804+
if (resolvedStoragePath == null) {
805+
LOGGER.debug(
806+
"Unable to find storage configuration information for table {}", tableIdentifier);
807+
return responseBuilder;
808+
}
812809

810+
if (baseCatalog instanceof IcebergCatalog
811+
|| realmConfig.getConfig(
812+
ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) {
813813
AccessConfig accessConfig =
814814
accessConfigProvider.getAccessConfig(
815815
callContext,
@@ -838,6 +838,7 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential
838838
}
839839
responseBuilder.addAllConfig(accessConfig.extraProperties());
840840
}
841+
841842
return responseBuilder;
842843
}
843844

@@ -1211,6 +1212,31 @@ private EnumSet<PolarisAuthorizableOperation> getUpdateTableAuthorizableOperatio
12111212
}
12121213
}
12131214

1215+
private void checkAllowExternalCatalogCredentialVending(
1216+
EnumSet<AccessDelegationMode> delegationModes) {
1217+
1218+
if (delegationModes.isEmpty()) {
1219+
return;
1220+
}
1221+
CatalogEntity catalogEntity = getResolvedCatalogEntity();
1222+
1223+
LOGGER.info("Catalog type: {}", catalogEntity.getCatalogType());
1224+
LOGGER.info(
1225+
"allow external catalog credential vending: {}",
1226+
realmConfig.getConfig(
1227+
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity));
1228+
if (catalogEntity
1229+
.getCatalogType()
1230+
.equals(org.apache.polaris.core.admin.model.Catalog.TypeEnum.EXTERNAL)
1231+
&& !realmConfig.getConfig(
1232+
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)) {
1233+
throw new ForbiddenException(
1234+
"Access Delegation is not enabled for this catalog. Please consult applicable "
1235+
+ "documentation for the catalog config property '%s' to enable this feature",
1236+
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING.catalogConfig());
1237+
}
1238+
}
1239+
12141240
@Override
12151241
public void close() throws Exception {
12161242
if (baseCatalog instanceof Closeable closeable) {

0 commit comments

Comments
 (0)