Skip to content

Commit 93938fd

Browse files
authored
Update spark client to use the shaded iceberg-core in iceberg-spark-runtime to avoid spark compatibilities issue (#1908)
* add change * add comment * update change * add comment * add change * add tests * add comment * clean up style check * update build * Revert "Reuse shadowJar for spark client bundle jar maven publish (#1857)" This reverts commit 1f7f127. * Reuse shadowJar for spark client bundle jar maven publish (#1857) * fix spark client * fix test failure and address feedback * fix error * update regression test * update classifier name * address comment * add change * update doc * update build and readme * add back jr * udpate dependency * add change * update * update tests * remove merge service file * update readme * update readme * update checkstyl * rebase with main * Revert "Reuse shadowJar for spark client bundle jar maven publish (#1857)" This reverts commit 40f4d36. * update checkstyle * revert change * address comments * trigger tests
1 parent fe81542 commit 93938fd

File tree

21 files changed

+961
-81
lines changed

21 files changed

+961
-81
lines changed

integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
* @see PolarisClient#catalogApi(ClientCredentials)
5151
*/
5252
public class CatalogApi extends RestApi {
53-
CatalogApi(Client client, PolarisApiEndpoints endpoints, String authToken, URI uri) {
53+
public CatalogApi(Client client, PolarisApiEndpoints endpoints, String authToken, URI uri) {
5454
super(client, endpoints, authToken, uri);
5555
}
5656

integration-tests/src/main/java/org/apache/polaris/service/it/env/ManagementApi.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
* @see PolarisClient#managementApi(ClientCredentials)
5454
*/
5555
public class ManagementApi extends RestApi {
56-
ManagementApi(Client client, PolarisApiEndpoints endpoints, String authToken, URI uri) {
56+
public ManagementApi(Client client, PolarisApiEndpoints endpoints, String authToken, URI uri) {
5757
super(client, endpoints, authToken, uri);
5858
}
5959

plugins/spark/v3.5/integration/build.gradle.kts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,13 @@ dependencies {
4545

4646
implementation(project(":polaris-runtime-service"))
4747

48-
testImplementation(project(":polaris-api-management-model"))
48+
testImplementation(
49+
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"
50+
)
4951
testImplementation(project(":polaris-spark-${sparkMajorVersion}_${scalaVersion}"))
5052

53+
testImplementation(project(":polaris-api-management-model"))
54+
5155
testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
5256
// exclude log4j dependencies. Explicit dependencies for the log4j libraries are
5357
// enforced below to ensure the version compatibility
@@ -64,13 +68,7 @@ dependencies {
6468
testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1")
6569

6670
testImplementation(platform(libs.jackson.bom))
67-
testImplementation("com.fasterxml.jackson.core:jackson-annotations")
68-
testImplementation("com.fasterxml.jackson.core:jackson-core")
69-
testImplementation("com.fasterxml.jackson.core:jackson-databind")
70-
71-
testImplementation(
72-
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"
73-
)
71+
testImplementation("com.fasterxml.jackson.jakarta.rs:jackson-jakarta-rs-json-provider")
7472

7573
testImplementation(testFixtures(project(":polaris-runtime-service")))
7674

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.spark.quarkus.it;
20+
21+
import static java.util.concurrent.TimeUnit.MINUTES;
22+
import static org.apache.polaris.service.it.ext.PolarisServerManagerLoader.polarisServerManager;
23+
24+
import com.fasterxml.jackson.databind.ObjectMapper;
25+
import com.fasterxml.jackson.jakarta.rs.json.JacksonJsonProvider;
26+
import jakarta.ws.rs.client.Client;
27+
import jakarta.ws.rs.client.ClientBuilder;
28+
import java.util.Map;
29+
import java.util.Random;
30+
import org.apache.iceberg.rest.HTTPClient;
31+
import org.apache.iceberg.rest.RESTClient;
32+
import org.apache.iceberg.rest.auth.AuthSession;
33+
import org.apache.iceberg.rest.auth.OAuth2Util;
34+
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
35+
import org.apache.polaris.service.it.env.ClientCredentials;
36+
import org.apache.polaris.service.it.env.ManagementApi;
37+
import org.apache.polaris.service.it.env.PolarisApiEndpoints;
38+
39+
/**
40+
* This class provides a REST client for the Polaris Management service endpoints and its auth-token
41+
* endpoint, which is used in Spark client tests to run commands that Spark SQL can’t issue directly
42+
* (e.g., createCatalog).
43+
*/
44+
public final class PolarisManagementClient implements AutoCloseable {
45+
private final PolarisApiEndpoints endpoints;
46+
private final Client client;
47+
// Use an alphanumeric ID for widest compatibility in HTTP and SQL.
48+
// Use MAX_RADIX for shorter output.
49+
private final String clientId =
50+
Long.toString(Math.abs(new Random().nextLong()), Character.MAX_RADIX);
51+
// initialization an Iceberg rest client for fetch token
52+
private final RESTClient restClient;
53+
54+
private PolarisManagementClient(PolarisApiEndpoints endpoints) {
55+
this.endpoints = endpoints;
56+
57+
this.client =
58+
ClientBuilder.newBuilder()
59+
.readTimeout(5, MINUTES)
60+
.connectTimeout(1, MINUTES)
61+
.register(new JacksonJsonProvider(new ObjectMapper()))
62+
.build();
63+
64+
this.restClient = HTTPClient.builder(Map.of()).uri(endpoints.catalogApiEndpoint()).build();
65+
}
66+
67+
public static PolarisManagementClient managementClient(PolarisApiEndpoints endpoints) {
68+
return new PolarisManagementClient(endpoints);
69+
}
70+
71+
/** This method should be used by test code to make top-level entity names. */
72+
public String newEntityName(String hint) {
73+
return polarisServerManager().transformEntityName(hint + "_" + clientId);
74+
}
75+
76+
public ManagementApi managementApi(String authToken) {
77+
return new ManagementApi(client, endpoints, authToken, endpoints.managementApiEndpoint());
78+
}
79+
80+
public ManagementApi managementApi(ClientCredentials credentials) {
81+
return managementApi(obtainToken(credentials));
82+
}
83+
84+
/** Requests an access token from the Polaris server for the given {@link ClientCredentials}. */
85+
public String obtainToken(ClientCredentials credentials) {
86+
OAuthTokenResponse response =
87+
OAuth2Util.fetchToken(
88+
restClient.withAuthSession(AuthSession.EMPTY),
89+
Map.of(),
90+
String.format("%s:%s", credentials.clientId(), credentials.clientSecret()),
91+
"PRINCIPAL_ROLE:ALL",
92+
endpoints.catalogApiEndpoint() + "/v1/oauth/tokens",
93+
Map.of("grant_type", "client_credentials"));
94+
return response.token();
95+
}
96+
97+
@Override
98+
public void close() throws Exception {
99+
client.close();
100+
restClient.close();
101+
}
102+
}

plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ public class SparkCatalogIcebergIT extends SparkCatalogBaseIT {
2727
@Override
2828
protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) {
2929
return builder
30+
.config(
31+
"spark.sql.extensions",
32+
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
3033
.config(
3134
String.format("spark.sql.catalog.%s", catalogName),
3235
"org.apache.iceberg.spark.SparkCatalog")

plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java

Lines changed: 149 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,133 @@
1818
*/
1919
package org.apache.polaris.spark.quarkus.it;
2020

21+
import com.adobe.testing.s3mock.testcontainers.S3MockContainer;
2122
import com.google.common.collect.ImmutableList;
2223
import com.google.errorprone.annotations.FormatMethod;
2324
import java.io.File;
25+
import java.io.IOException;
26+
import java.net.URI;
27+
import java.nio.file.Path;
2428
import java.util.List;
29+
import java.util.Map;
2530
import java.util.UUID;
2631
import java.util.stream.Collectors;
2732
import java.util.stream.IntStream;
2833
import org.apache.commons.io.FileUtils;
2934
import org.apache.commons.io.filefilter.DirectoryFileFilter;
3035
import org.apache.commons.io.filefilter.FalseFileFilter;
31-
import org.apache.polaris.service.it.ext.PolarisSparkIntegrationTestBase;
36+
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
37+
import org.apache.polaris.core.admin.model.Catalog;
38+
import org.apache.polaris.core.admin.model.CatalogProperties;
39+
import org.apache.polaris.core.admin.model.PolarisCatalog;
40+
import org.apache.polaris.core.admin.model.StorageConfigInfo;
41+
import org.apache.polaris.service.it.env.ClientCredentials;
42+
import org.apache.polaris.service.it.env.IntegrationTestsHelper;
43+
import org.apache.polaris.service.it.env.ManagementApi;
44+
import org.apache.polaris.service.it.env.PolarisApiEndpoints;
45+
import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
46+
import org.apache.spark.sql.Dataset;
3247
import org.apache.spark.sql.Row;
3348
import org.apache.spark.sql.SparkSession;
49+
import org.intellij.lang.annotations.Language;
50+
import org.junit.jupiter.api.AfterAll;
51+
import org.junit.jupiter.api.AfterEach;
52+
import org.junit.jupiter.api.BeforeAll;
53+
import org.junit.jupiter.api.BeforeEach;
54+
import org.junit.jupiter.api.extension.ExtendWith;
55+
import org.junit.jupiter.api.io.TempDir;
56+
import org.slf4j.LoggerFactory;
3457

35-
public abstract class SparkIntegrationBase extends PolarisSparkIntegrationTestBase {
58+
@ExtendWith(PolarisIntegrationTestExtension.class)
59+
public abstract class SparkIntegrationBase {
60+
protected static final S3MockContainer s3Container =
61+
new S3MockContainer("3.11.0").withInitialBuckets("my-bucket,my-old-bucket");
62+
protected static SparkSession spark;
63+
protected PolarisApiEndpoints endpoints;
64+
protected PolarisManagementClient client;
65+
protected ManagementApi managementApi;
66+
protected String catalogName;
67+
protected String sparkToken;
68+
69+
protected URI warehouseDir;
70+
71+
@BeforeAll
72+
public static void setup() throws IOException {
73+
s3Container.start();
74+
}
75+
76+
@AfterAll
77+
public static void cleanup() {
78+
s3Container.stop();
79+
}
80+
81+
@BeforeEach
82+
public void before(
83+
PolarisApiEndpoints apiEndpoints, ClientCredentials credentials, @TempDir Path tempDir) {
84+
endpoints = apiEndpoints;
85+
client = PolarisManagementClient.managementClient(endpoints);
86+
sparkToken = client.obtainToken(credentials);
87+
managementApi = client.managementApi(credentials);
88+
89+
warehouseDir = IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve("spark-warehouse");
90+
91+
catalogName = client.newEntityName("spark_catalog");
92+
93+
AwsStorageConfigInfo awsConfigModel =
94+
AwsStorageConfigInfo.builder()
95+
.setRoleArn("arn:aws:iam::123456789012:role/my-role")
96+
.setExternalId("externalId")
97+
.setUserArn("userArn")
98+
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
99+
.setAllowedLocations(List.of("s3://my-old-bucket/path/to/data"))
100+
.build();
101+
CatalogProperties props = new CatalogProperties("s3://my-bucket/path/to/data");
102+
props.putAll(
103+
Map.of(
104+
"table-default.s3.endpoint",
105+
s3Container.getHttpEndpoint(),
106+
"table-default.s3.path-style-access",
107+
"true",
108+
"table-default.s3.access-key-id",
109+
"foo",
110+
"table-default.s3.secret-access-key",
111+
"bar",
112+
"s3.endpoint",
113+
s3Container.getHttpEndpoint(),
114+
"s3.path-style-access",
115+
"true",
116+
"s3.access-key-id",
117+
"foo",
118+
"s3.secret-access-key",
119+
"bar",
120+
"polaris.config.drop-with-purge.enabled",
121+
"true"));
122+
Catalog catalog =
123+
PolarisCatalog.builder()
124+
.setType(Catalog.TypeEnum.INTERNAL)
125+
.setName(catalogName)
126+
.setProperties(props)
127+
.setStorageConfigInfo(awsConfigModel)
128+
.build();
129+
130+
managementApi.createCatalog(catalog);
131+
132+
SparkSession.Builder sessionBuilder =
133+
SparkSession.builder()
134+
.master("local[1]")
135+
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
136+
.config(
137+
"spark.hadoop.fs.s3.aws.credentials.provider",
138+
"org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")
139+
.config("spark.hadoop.fs.s3.access.key", "foo")
140+
.config("spark.hadoop.fs.s3.secret.key", "bar")
141+
.config("spark.ui.showConsoleProgress", false)
142+
.config("spark.ui.enabled", "false");
143+
spark = withCatalog(sessionBuilder, catalogName).getOrCreate();
144+
145+
onSpark("USE " + catalogName);
146+
}
36147

37-
@Override
38148
protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) {
39149
return builder
40150
.config(
@@ -61,6 +171,38 @@ protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String
61171
.config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2");
62172
}
63173

174+
@AfterEach
175+
public void after() throws Exception {
176+
cleanupCatalog(catalogName);
177+
try {
178+
SparkSession.clearDefaultSession();
179+
SparkSession.clearActiveSession();
180+
spark.close();
181+
} catch (Exception e) {
182+
LoggerFactory.getLogger(getClass()).error("Unable to close spark session", e);
183+
}
184+
185+
client.close();
186+
}
187+
188+
protected void cleanupCatalog(String catalogName) {
189+
onSpark("USE " + catalogName);
190+
List<Row> namespaces = onSpark("SHOW NAMESPACES").collectAsList();
191+
for (Row namespace : namespaces) {
192+
List<Row> tables = onSpark("SHOW TABLES IN " + namespace.getString(0)).collectAsList();
193+
for (Row table : tables) {
194+
onSpark("DROP TABLE " + namespace.getString(0) + "." + table.getString(1));
195+
}
196+
List<Row> views = onSpark("SHOW VIEWS IN " + namespace.getString(0)).collectAsList();
197+
for (Row view : views) {
198+
onSpark("DROP VIEW " + namespace.getString(0) + "." + view.getString(1));
199+
}
200+
onSpark("DROP NAMESPACE " + namespace.getString(0));
201+
}
202+
203+
managementApi.deleteCatalog(catalogName);
204+
}
205+
64206
@FormatMethod
65207
protected List<Object[]> sql(String query, Object... args) {
66208
List<Row> rows = spark.sql(String.format(query, args)).collectAsList();
@@ -110,4 +252,8 @@ protected List<String> listDirs(String path) {
110252
protected String generateName(String prefix) {
111253
return prefix + "_" + UUID.randomUUID().toString().replaceAll("-", "");
112254
}
255+
256+
protected static Dataset<Row> onSpark(@Language("SQL") String sql) {
257+
return spark.sql(sql);
258+
}
113259
}

0 commit comments

Comments
 (0)