Skip to content

Commit 8b85536

Browse files
haizhou-zhaoHaizhou Zhaonastra
authored andcommitted
Add REST Catalog tests to Spark 3.5 integration test (apache#11093)
* Add REST Catalog tests to Spark 3.5 integration test Add REST Catalog tests to Spark 3.4 integration test tmp save Fix integ tests Revert "Add REST Catalog tests to Spark 3.4 integration test" This reverts commit d052416. unneeded changes fix test retrigger checks Fix integ test Fix port already in use Fix unmatched validation catalog spotless Fix sqlite related test failures * Rebase & spotless * code format * unneeded change * unneeded change * Revert "unneeded change" This reverts commit ae29c41. * code format * Use in-mem config to configure RCK * Update open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTCatalogServer.java * Use RESTServerExtension * check style and test failure * test failure * fix test * fix test * spotless * Update open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTCatalogServer.java Co-authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com> * Update open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTCatalogServer.java Co-authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com> * Update spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java Co-authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com> * Spotless and fix test * Apply suggestions from code review * Apply suggestions from code review * Apply suggestions from code review * Update spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java * Package protected RCKUtils * spotless * unintentional change * remove warehouse specification from rest * spotless * move find free port to rest server extension * fix typo * checkstyle * fix unit test --------- Co-authored-by: Haizhou Zhao <haizhouzhao@Haizhous-MacBook-Pro.local> Co-authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com>
1 parent c186ff7 commit 8b85536

File tree

13 files changed

+195
-21
lines changed

13 files changed

+195
-21
lines changed

open-api/src/testFixtures/java/org/apache/iceberg/rest/RCKUtils.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
package org.apache.iceberg.rest;
2020

21+
import java.io.IOException;
22+
import java.io.UncheckedIOException;
23+
import java.net.ServerSocket;
2124
import java.util.HashMap;
2225
import java.util.List;
2326
import java.util.Locale;
@@ -77,14 +80,21 @@ static Map<String, String> environmentCatalogConfig() {
7780
}
7881

7982
static RESTCatalog initCatalogClient() {
83+
return initCatalogClient(Maps.newHashMap());
84+
}
85+
86+
static RESTCatalog initCatalogClient(Map<String, String> properties) {
8087
Map<String, String> catalogProperties = Maps.newHashMap();
8188
catalogProperties.putAll(RCKUtils.environmentCatalogConfig());
8289
catalogProperties.putAll(Maps.fromProperties(System.getProperties()));
90+
catalogProperties.putAll(properties);
8391

8492
// Set defaults
93+
String port =
94+
catalogProperties.getOrDefault(
95+
RESTCatalogServer.REST_PORT, String.valueOf(RESTCatalogServer.REST_PORT_DEFAULT));
8596
catalogProperties.putIfAbsent(
86-
CatalogProperties.URI,
87-
String.format("http://localhost:%s/", RESTCatalogServer.REST_PORT_DEFAULT));
97+
CatalogProperties.URI, String.format("http://localhost:%s/", port));
8898
catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION, "rck_warehouse");
8999

90100
RESTCatalog catalog = new RESTCatalog();
@@ -107,4 +117,12 @@ static void purgeCatalogTestEntries(RESTCatalog catalog) {
107117
catalog.dropNamespace(namespace);
108118
});
109119
}
120+
121+
static int findFreePort() {
122+
try (ServerSocket socket = new ServerSocket(0)) {
123+
return socket.getLocalPort();
124+
} catch (IOException e) {
125+
throw new UncheckedIOException(e);
126+
}
127+
}
110128
}

open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTCatalogServer.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iceberg.CatalogUtil;
2727
import org.apache.iceberg.catalog.Catalog;
2828
import org.apache.iceberg.jdbc.JdbcCatalog;
29+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
2930
import org.apache.iceberg.util.PropertyUtil;
3031
import org.eclipse.jetty.server.Server;
3132
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
@@ -37,12 +38,19 @@
3738
public class RESTCatalogServer {
3839
private static final Logger LOG = LoggerFactory.getLogger(RESTCatalogServer.class);
3940

40-
static final String REST_PORT = "rest.port";
41+
public static final String REST_PORT = "rest.port";
4142
static final int REST_PORT_DEFAULT = 8181;
4243

4344
private Server httpServer;
45+
private final Map<String, String> config;
4446

45-
RESTCatalogServer() {}
47+
RESTCatalogServer() {
48+
this.config = Maps.newHashMap();
49+
}
50+
51+
RESTCatalogServer(Map<String, String> config) {
52+
this.config = config;
53+
}
4654

4755
static class CatalogContext {
4856
private final Catalog catalog;
@@ -64,7 +72,8 @@ public Map<String, String> configuration() {
6472

6573
private CatalogContext initializeBackendCatalog() throws IOException {
6674
// Translate environment variables to catalog properties
67-
Map<String, String> catalogProperties = RCKUtils.environmentCatalogConfig();
75+
Map<String, String> catalogProperties = Maps.newHashMap(RCKUtils.environmentCatalogConfig());
76+
catalogProperties.putAll(config);
6877

6978
// Fallback to a JDBCCatalog impl if one is not set
7079
catalogProperties.putIfAbsent(CatalogProperties.CATALOG_IMPL, JdbcCatalog.class.getName());

open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTServerExtension.java

+34-1
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,49 @@
1818
*/
1919
package org.apache.iceberg.rest;
2020

21+
import java.util.Map;
22+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
2123
import org.junit.jupiter.api.extension.AfterAllCallback;
2224
import org.junit.jupiter.api.extension.BeforeAllCallback;
2325
import org.junit.jupiter.api.extension.ExtensionContext;
2426

2527
public class RESTServerExtension implements BeforeAllCallback, AfterAllCallback {
28+
// if the caller explicitly wants the server to start on port 0, it means the caller wants to
29+
// launch on a free port
30+
public static final String FREE_PORT = "0";
31+
2632
private RESTCatalogServer localServer;
33+
private RESTCatalog client;
34+
private final Map<String, String> config;
35+
36+
public RESTServerExtension() {
37+
config = Maps.newHashMap();
38+
}
39+
40+
public RESTServerExtension(Map<String, String> config) {
41+
Map<String, String> conf = Maps.newHashMap(config);
42+
if (conf.containsKey(RESTCatalogServer.REST_PORT)
43+
&& conf.get(RESTCatalogServer.REST_PORT).equals(FREE_PORT)) {
44+
conf.put(RESTCatalogServer.REST_PORT, String.valueOf(RCKUtils.findFreePort()));
45+
}
46+
this.config = conf;
47+
}
48+
49+
public Map<String, String> config() {
50+
return config;
51+
}
52+
53+
public RESTCatalog client() {
54+
return client;
55+
}
2756

2857
@Override
2958
public void beforeAll(ExtensionContext extensionContext) throws Exception {
3059
if (Boolean.parseBoolean(
3160
extensionContext.getConfigurationParameter(RCKUtils.RCK_LOCAL).orElse("true"))) {
32-
this.localServer = new RESTCatalogServer();
61+
this.localServer = new RESTCatalogServer(config);
3362
this.localServer.start(false);
63+
this.client = RCKUtils.initCatalogClient(config);
3464
}
3565
}
3666

@@ -39,5 +69,8 @@ public void afterAll(ExtensionContext extensionContext) throws Exception {
3969
if (localServer != null) {
4070
localServer.stop();
4171
}
72+
if (client != null) {
73+
client.close();
74+
}
4275
}
4376
}

spark/v3.5/build.gradle

+22
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,13 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
107107
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
108108
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
109109
testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts')
110+
testImplementation (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) {
111+
transitive = false
112+
}
110113
testImplementation libs.sqlite.jdbc
111114
testImplementation libs.awaitility
115+
// runtime dependencies for running REST Catalog based integration test
116+
testRuntimeOnly libs.jetty.servlet
112117
}
113118

114119
test {
@@ -172,6 +177,12 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
172177
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
173178
testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
174179
testImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
180+
testImplementation (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) {
181+
transitive = false
182+
}
183+
// runtime dependencies for running REST Catalog based integration test
184+
testRuntimeOnly libs.jetty.servlet
185+
testRuntimeOnly libs.sqlite.jdbc
175186

176187
testImplementation libs.avro.avro
177188
testImplementation libs.parquet.hadoop
@@ -255,6 +266,17 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
255266
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
256267
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
257268
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
269+
270+
// runtime dependencies for running Hive Catalog based integration test
271+
integrationRuntimeOnly project(':iceberg-hive-metastore')
272+
// runtime dependencies for running REST Catalog based integration test
273+
integrationRuntimeOnly project(path: ':iceberg-core', configuration: 'testArtifacts')
274+
integrationRuntimeOnly (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) {
275+
transitive = false
276+
}
277+
integrationRuntimeOnly libs.jetty.servlet
278+
integrationRuntimeOnly libs.sqlite.jdbc
279+
258280
// Not allowed on our classpath, only the runtime jar is allowed
259281
integrationCompileOnly project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}")
260282
integrationCompileOnly project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}")

spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
*/
1919
package org.apache.iceberg.spark.extensions;
2020

21+
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
22+
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
2123
import static org.apache.iceberg.types.Types.NestedField.optional;
2224
import static org.assertj.core.api.Assertions.assertThat;
25+
import static org.assertj.core.api.Assumptions.assumeThat;
2326

2427
import java.io.IOException;
2528
import java.util.Comparator;
@@ -521,7 +524,7 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
521524
optional(3, "category", Types.StringType.get())));
522525

523526
spark.createDataFrame(newRecords, newSparkSchema).coalesce(1).writeTo(tableName).append();
524-
527+
table.refresh();
525528
Long currentSnapshotId = table.currentSnapshot().snapshotId();
526529

527530
Dataset<Row> actualFilesDs =
@@ -740,6 +743,11 @@ private boolean partitionMatch(Record file, String partValue) {
740743

741744
@TestTemplate
742745
public void metadataLogEntriesAfterReplacingTable() throws Exception {
746+
assumeThat(catalogConfig.get(ICEBERG_CATALOG_TYPE))
747+
.as(
748+
"need to fix https://github.com/apache/iceberg/issues/11109 before enabling this for the REST catalog")
749+
.isNotEqualTo(ICEBERG_CATALOG_TYPE_REST);
750+
743751
sql(
744752
"CREATE TABLE %s (id bigint, data string) "
745753
+ "USING iceberg "

spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -450,12 +450,14 @@ public void testRemoveOrphanFilesWithStatisticFiles() throws Exception {
450450
Table table = Spark3Util.loadIcebergTable(spark, tableName);
451451

452452
String statsFileName = "stats-file-" + UUID.randomUUID();
453+
String location = table.location();
454+
// not every catalog will return file proto for local directories
455+
// i.e. Hadoop and Hive Catalog do, Jdbc and REST do not
456+
if (!location.startsWith("file:")) {
457+
location = "file:" + location;
458+
}
453459
File statsLocation =
454-
new File(new URI(table.location()))
455-
.toPath()
456-
.resolve("data")
457-
.resolve(statsFileName)
458-
.toFile();
460+
new File(new URI(location)).toPath().resolve("data").resolve(statsFileName).toFile();
459461
StatisticsFile statisticsFile;
460462
try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) {
461463
long snapshotId = table.currentSnapshot().snapshotId();

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java

+10
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
*/
1919
package org.apache.iceberg.spark;
2020

21+
import org.apache.iceberg.CatalogProperties;
2122
import org.apache.iceberg.ParameterizedTestExtension;
2223
import org.apache.iceberg.Parameters;
24+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
2325
import org.junit.jupiter.api.extension.ExtendWith;
2426

2527
@ExtendWith(ParameterizedTestExtension.class)
@@ -43,6 +45,14 @@ protected static Object[][] parameters() {
4345
SparkCatalogConfig.SPARK.catalogName(),
4446
SparkCatalogConfig.SPARK.implementation(),
4547
SparkCatalogConfig.SPARK.properties()
48+
},
49+
{
50+
SparkCatalogConfig.REST.catalogName(),
51+
SparkCatalogConfig.REST.implementation(),
52+
ImmutableMap.builder()
53+
.putAll(SparkCatalogConfig.REST.properties())
54+
.put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI))
55+
.build()
4656
}
4757
};
4858
}

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java

+4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ public enum SparkCatalogConfig {
3434
"testhadoop",
3535
SparkCatalog.class.getName(),
3636
ImmutableMap.of("type", "hadoop", "cache-enabled", "false")),
37+
REST(
38+
"testrest",
39+
SparkCatalog.class.getName(),
40+
ImmutableMap.of("type", "rest", "cache-enabled", "false")),
3741
SPARK(
3842
"spark_catalog",
3943
SparkSessionCatalog.class.getName(),

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java

+58-7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
*/
1919
package org.apache.iceberg.spark;
2020

21+
import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL;
22+
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
23+
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP;
24+
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE;
25+
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
2126
import static org.assertj.core.api.Assertions.assertThat;
2227

2328
import java.io.File;
@@ -36,17 +41,38 @@
3641
import org.apache.iceberg.catalog.SupportsNamespaces;
3742
import org.apache.iceberg.catalog.TableIdentifier;
3843
import org.apache.iceberg.hadoop.HadoopCatalog;
44+
import org.apache.iceberg.inmemory.InMemoryCatalog;
45+
import org.apache.iceberg.rest.RESTCatalog;
46+
import org.apache.iceberg.rest.RESTCatalogServer;
47+
import org.apache.iceberg.rest.RESTServerExtension;
3948
import org.apache.iceberg.util.PropertyUtil;
4049
import org.junit.jupiter.api.AfterAll;
4150
import org.junit.jupiter.api.BeforeAll;
4251
import org.junit.jupiter.api.BeforeEach;
4352
import org.junit.jupiter.api.extension.ExtendWith;
53+
import org.junit.jupiter.api.extension.RegisterExtension;
4454
import org.junit.jupiter.api.io.TempDir;
4555

4656
@ExtendWith(ParameterizedTestExtension.class)
4757
public abstract class TestBaseWithCatalog extends TestBase {
4858
protected static File warehouse = null;
4959

60+
@RegisterExtension
61+
private static final RESTServerExtension REST_SERVER_EXTENSION =
62+
new RESTServerExtension(
63+
Map.of(
64+
RESTCatalogServer.REST_PORT,
65+
RESTServerExtension.FREE_PORT,
66+
// In-memory sqlite database by default is private to the connection that created it.
67+
// If more than 1 jdbc connection backed by in-memory sqlite is created behind one
68+
// JdbcCatalog, then different jdbc connections could provide different views of table
69+
// status even belonging to the same catalog. Reference:
70+
// https://www.sqlite.org/inmemorydb.html
71+
CatalogProperties.CLIENT_POOL_SIZE,
72+
"1"));
73+
74+
protected static RESTCatalog restCatalog;
75+
5076
@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
5177
protected static Object[][] parameters() {
5278
return new Object[][] {
@@ -59,13 +85,14 @@ protected static Object[][] parameters() {
5985
}
6086

6187
@BeforeAll
62-
public static void createWarehouse() throws IOException {
88+
public static void setUpAll() throws IOException {
6389
TestBaseWithCatalog.warehouse = File.createTempFile("warehouse", null);
6490
assertThat(warehouse.delete()).isTrue();
91+
restCatalog = REST_SERVER_EXTENSION.client();
6592
}
6693

6794
@AfterAll
68-
public static void dropWarehouse() throws IOException {
95+
public static void tearDownAll() throws IOException {
6996
if (warehouse != null && warehouse.exists()) {
7097
Path warehousePath = new Path(warehouse.getAbsolutePath());
7198
FileSystem fs = warehousePath.getFileSystem(hiveConf);
@@ -89,13 +116,37 @@ public static void dropWarehouse() throws IOException {
89116
protected TableIdentifier tableIdent = TableIdentifier.of(Namespace.of("default"), "table");
90117
protected String tableName;
91118

119+
private void configureValidationCatalog() {
120+
if (catalogConfig.containsKey(ICEBERG_CATALOG_TYPE)) {
121+
switch (catalogConfig.get(ICEBERG_CATALOG_TYPE)) {
122+
case ICEBERG_CATALOG_TYPE_HADOOP:
123+
this.validationCatalog =
124+
new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse);
125+
break;
126+
case ICEBERG_CATALOG_TYPE_REST:
127+
this.validationCatalog = restCatalog;
128+
break;
129+
case ICEBERG_CATALOG_TYPE_HIVE:
130+
this.validationCatalog = catalog;
131+
break;
132+
default:
133+
throw new IllegalArgumentException("Unknown catalog type");
134+
}
135+
} else if (catalogConfig.containsKey(CATALOG_IMPL)) {
136+
switch (catalogConfig.get(CATALOG_IMPL)) {
137+
case "org.apache.iceberg.inmemory.InMemoryCatalog":
138+
this.validationCatalog = new InMemoryCatalog();
139+
break;
140+
default:
141+
throw new IllegalArgumentException("Unknown catalog impl");
142+
}
143+
}
144+
this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
145+
}
146+
92147
@BeforeEach
93148
public void before() {
94-
this.validationCatalog =
95-
catalogName.equals("testhadoop")
96-
? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse)
97-
: catalog;
98-
this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
149+
configureValidationCatalog();
99150

100151
spark.conf().set("spark.sql.catalog." + catalogName, implementation);
101152
catalogConfig.forEach(

0 commit comments

Comments
 (0)