Skip to content

Commit c2209cf

Browse files
Integration tests for Catalog Federation (#2344)
Adds a Junit5 integration test for catalog federation.
1 parent 82cd416 commit c2209cf

File tree

2 files changed

+279
-0
lines changed

2 files changed

+279
-0
lines changed
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.service.it.test;
20+
21+
import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
24+
import java.net.URI;
25+
import java.util.List;
26+
import org.apache.polaris.core.admin.model.AuthenticationParameters;
27+
import org.apache.polaris.core.admin.model.Catalog;
28+
import org.apache.polaris.core.admin.model.CatalogGrant;
29+
import org.apache.polaris.core.admin.model.CatalogPrivilege;
30+
import org.apache.polaris.core.admin.model.CatalogProperties;
31+
import org.apache.polaris.core.admin.model.CatalogRole;
32+
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
33+
import org.apache.polaris.core.admin.model.ExternalCatalog;
34+
import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
35+
import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
36+
import org.apache.polaris.core.admin.model.OAuthClientCredentialsParameters;
37+
import org.apache.polaris.core.admin.model.PolarisCatalog;
38+
import org.apache.polaris.core.admin.model.PrincipalWithCredentials;
39+
import org.apache.polaris.core.admin.model.StorageConfigInfo;
40+
import org.apache.polaris.service.it.env.ClientCredentials;
41+
import org.apache.polaris.service.it.env.ManagementApi;
42+
import org.apache.polaris.service.it.env.PolarisApiEndpoints;
43+
import org.apache.polaris.service.it.env.PolarisClient;
44+
import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
45+
import org.apache.polaris.service.it.ext.SparkSessionBuilder;
46+
import org.apache.spark.sql.Row;
47+
import org.apache.spark.sql.SparkSession;
48+
import org.junit.jupiter.api.AfterAll;
49+
import org.junit.jupiter.api.AfterEach;
50+
import org.junit.jupiter.api.BeforeAll;
51+
import org.junit.jupiter.api.BeforeEach;
52+
import org.junit.jupiter.api.Test;
53+
import org.junit.jupiter.api.extension.ExtendWith;
54+
import org.junit.jupiter.api.io.TempDir;
55+
56+
/**
57+
* Integration test for catalog federation functionality. This test verifies that an external
58+
* catalog can be created that federates with an internal catalog.
59+
*/
60+
@ExtendWith(PolarisIntegrationTestExtension.class)
61+
public class CatalogFederationIntegrationTest {
62+
63+
private static PolarisClient client;
64+
private static ManagementApi managementApi;
65+
private static PolarisApiEndpoints endpoints;
66+
private static ClientCredentials adminCredentials;
67+
private static SparkSession spark;
68+
private static String sparkToken;
69+
70+
private static final String PRINCIPAL_NAME = "test-catalog-federation-user";
71+
private static final String LOCAL_CATALOG_NAME = "test_catalog_local";
72+
private static final String EXTERNAL_CATALOG_NAME = "test_catalog_external";
73+
private static final String CATALOG_ROLE_NAME = "catalog_admin";
74+
private static final String PRINCIPAL_ROLE_NAME = "service_admin";
75+
76+
@TempDir static java.nio.file.Path warehouseDir;
77+
78+
private URI baseLocation;
79+
private PrincipalWithCredentials newUserCredentials;
80+
81+
@BeforeAll
82+
static void setup(PolarisApiEndpoints apiEndpoints, ClientCredentials credentials) {
83+
endpoints = apiEndpoints;
84+
client = polarisClient(endpoints);
85+
managementApi = client.managementApi(credentials);
86+
adminCredentials = credentials;
87+
sparkToken = client.obtainToken(credentials);
88+
}
89+
90+
@AfterAll
91+
static void close() throws Exception {
92+
if (client != null) {
93+
client.close();
94+
}
95+
}
96+
97+
@BeforeEach
98+
void before() {
99+
this.baseLocation = URI.create("file:///tmp/warehouse");
100+
}
101+
102+
@AfterEach
103+
void after() {
104+
if (spark != null) {
105+
SparkSession.clearDefaultSession();
106+
SparkSession.clearActiveSession();
107+
spark.close();
108+
}
109+
managementApi.dropCatalog(EXTERNAL_CATALOG_NAME);
110+
managementApi.dropCatalog(LOCAL_CATALOG_NAME);
111+
managementApi.deletePrincipal(PRINCIPAL_NAME);
112+
}
113+
114+
@Test
115+
void testCatalogFederation() {
116+
newUserCredentials = managementApi.createPrincipal(PRINCIPAL_NAME);
117+
118+
FileStorageConfigInfo storageConfig =
119+
FileStorageConfigInfo.builder()
120+
.setStorageType(StorageConfigInfo.StorageTypeEnum.FILE)
121+
.setAllowedLocations(List.of(baseLocation.toString()))
122+
.build();
123+
124+
CatalogProperties catalogProperties = new CatalogProperties(baseLocation.toString());
125+
126+
Catalog localCatalog =
127+
PolarisCatalog.builder()
128+
.setType(Catalog.TypeEnum.INTERNAL)
129+
.setName(LOCAL_CATALOG_NAME)
130+
.setProperties(catalogProperties)
131+
.setStorageConfigInfo(storageConfig)
132+
.build();
133+
managementApi.createCatalog(localCatalog);
134+
135+
CatalogGrant catalogGrant =
136+
CatalogGrant.builder()
137+
.setType(CatalogGrant.TypeEnum.CATALOG)
138+
.setPrivilege(CatalogPrivilege.TABLE_WRITE_DATA)
139+
.build();
140+
managementApi.addGrant(LOCAL_CATALOG_NAME, CATALOG_ROLE_NAME, catalogGrant);
141+
managementApi.assignPrincipalRole(PRINCIPAL_NAME, PRINCIPAL_ROLE_NAME);
142+
CatalogRole localCatalogAdminRole =
143+
managementApi.getCatalogRole(LOCAL_CATALOG_NAME, CATALOG_ROLE_NAME);
144+
managementApi.grantCatalogRoleToPrincipalRole(
145+
PRINCIPAL_ROLE_NAME, LOCAL_CATALOG_NAME, localCatalogAdminRole);
146+
147+
AuthenticationParameters authParams =
148+
OAuthClientCredentialsParameters.builder()
149+
.setAuthenticationType(AuthenticationParameters.AuthenticationTypeEnum.OAUTH)
150+
.setTokenUri(endpoints.catalogApiEndpoint().toString() + "/v1/oauth/tokens")
151+
.setClientId(newUserCredentials.getCredentials().getClientId())
152+
.setClientSecret(newUserCredentials.getCredentials().getClientSecret())
153+
.setScopes(List.of("PRINCIPAL_ROLE:ALL"))
154+
.build();
155+
ConnectionConfigInfo connectionConfig =
156+
IcebergRestConnectionConfigInfo.builder()
157+
.setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.ICEBERG_REST)
158+
.setUri(endpoints.catalogApiEndpoint().toString())
159+
.setRemoteCatalogName(LOCAL_CATALOG_NAME)
160+
.setAuthenticationParameters(authParams)
161+
.build();
162+
ExternalCatalog externalCatalog =
163+
ExternalCatalog.builder()
164+
.setType(Catalog.TypeEnum.EXTERNAL)
165+
.setName(EXTERNAL_CATALOG_NAME)
166+
.setConnectionConfigInfo(connectionConfig)
167+
.setProperties(catalogProperties)
168+
.setStorageConfigInfo(storageConfig)
169+
.build();
170+
managementApi.createCatalog(externalCatalog);
171+
172+
managementApi.addGrant(EXTERNAL_CATALOG_NAME, CATALOG_ROLE_NAME, catalogGrant);
173+
CatalogRole externalCatalogAdminRole =
174+
managementApi.getCatalogRole(EXTERNAL_CATALOG_NAME, CATALOG_ROLE_NAME);
175+
managementApi.grantCatalogRoleToPrincipalRole(
176+
PRINCIPAL_ROLE_NAME, EXTERNAL_CATALOG_NAME, externalCatalogAdminRole);
177+
178+
spark =
179+
SparkSessionBuilder.buildWithTestDefaults()
180+
.withWarehouse(warehouseDir.toUri())
181+
.addCatalog(
182+
LOCAL_CATALOG_NAME, "org.apache.iceberg.spark.SparkCatalog", endpoints, sparkToken)
183+
.addCatalog(
184+
EXTERNAL_CATALOG_NAME,
185+
"org.apache.iceberg.spark.SparkCatalog",
186+
endpoints,
187+
sparkToken)
188+
.getOrCreate();
189+
190+
spark.sql("USE " + LOCAL_CATALOG_NAME);
191+
spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1");
192+
spark.sql("CREATE TABLE IF NOT EXISTS ns1.test_table (id int, name string)");
193+
spark.sql("INSERT INTO ns1.test_table VALUES (1, 'Alice')");
194+
spark.sql("INSERT INTO ns1.test_table VALUES (2, 'Bob')");
195+
196+
spark.sql("CREATE NAMESPACE IF NOT EXISTS ns2");
197+
spark.sql("CREATE TABLE IF NOT EXISTS ns2.test_table (id int, name string)");
198+
spark.sql("INSERT INTO ns2.test_table VALUES (1, 'Apache Spark')");
199+
spark.sql("INSERT INTO ns2.test_table VALUES (2, 'Apache Iceberg')");
200+
201+
spark.sql("USE " + EXTERNAL_CATALOG_NAME);
202+
List<Row> namespaces = spark.sql("SHOW NAMESPACES").collectAsList();
203+
assertThat(namespaces).hasSize(2);
204+
205+
List<Row> ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList();
206+
assertThat(ns1Data).hasSize(2);
207+
assertThat(ns1Data.get(0).getInt(0)).isEqualTo(1);
208+
assertThat(ns1Data.get(0).getString(1)).isEqualTo("Alice");
209+
assertThat(ns1Data.get(1).getInt(0)).isEqualTo(2);
210+
assertThat(ns1Data.get(1).getString(1)).isEqualTo("Bob");
211+
spark.sql("INSERT INTO ns1.test_table VALUES (3, 'Charlie')");
212+
List<Row> ns2Data = spark.sql("SELECT * FROM ns2.test_table ORDER BY id").collectAsList();
213+
assertThat(ns2Data).hasSize(2);
214+
assertThat(ns2Data.get(0).getInt(0)).isEqualTo(1);
215+
assertThat(ns2Data.get(0).getString(1)).isEqualTo("Apache Spark");
216+
assertThat(ns2Data.get(1).getInt(0)).isEqualTo(2);
217+
assertThat(ns2Data.get(1).getString(1)).isEqualTo("Apache Iceberg");
218+
spark.sql("INSERT INTO ns2.test_table VALUES (3, 'Apache Polaris')");
219+
220+
spark.sql("USE " + LOCAL_CATALOG_NAME);
221+
spark.sql("REFRESH TABLE ns1.test_table");
222+
spark.sql("REFRESH TABLE ns2.test_table");
223+
List<Row> updatedNs1Data =
224+
spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList();
225+
assertThat(updatedNs1Data).hasSize(3);
226+
assertThat(updatedNs1Data.get(2).getInt(0)).isEqualTo(3);
227+
assertThat(updatedNs1Data.get(2).getString(1)).isEqualTo("Charlie");
228+
List<Row> updatedNs2Data =
229+
spark.sql("SELECT * FROM ns2.test_table ORDER BY id").collectAsList();
230+
assertThat(updatedNs2Data).hasSize(3);
231+
assertThat(updatedNs2Data.get(2).getInt(0)).isEqualTo(3);
232+
assertThat(updatedNs2Data.get(2).getString(1)).isEqualTo("Apache Polaris");
233+
234+
spark.sql("DROP TABLE ns1.test_table");
235+
spark.sql("DROP TABLE ns2.test_table");
236+
spark.sql("DROP NAMESPACE ns1");
237+
spark.sql("DROP NAMESPACE ns2");
238+
}
239+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.service.spark.it;
20+
21+
import io.quarkus.test.junit.QuarkusIntegrationTest;
22+
import io.quarkus.test.junit.QuarkusTestProfile;
23+
import io.quarkus.test.junit.TestProfile;
24+
import java.util.Map;
25+
import org.apache.polaris.service.it.test.CatalogFederationIntegrationTest;
26+
27+
@TestProfile(CatalogFederationIT.CatalogFederationProfile.class)
28+
@QuarkusIntegrationTest
29+
public class CatalogFederationIT extends CatalogFederationIntegrationTest {
30+
31+
public static class CatalogFederationProfile implements QuarkusTestProfile {
32+
@Override
33+
public Map<String, String> getConfigOverrides() {
34+
return Map.of(
35+
"polaris.features.\"ENABLE_CATALOG_FEDERATION\"", "true",
36+
"polaris.features.\"SUPPORTED_CATALOG_CONNECTION_TYPES\"", "[\"ICEBERG_REST\"]",
37+
"polaris.features.\"ALLOW_OVERLAPPING_CATALOG_URLS\"", "true");
38+
}
39+
}
40+
}

0 commit comments

Comments
 (0)