Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add REST Catalog tests to Spark 3.5 integration test #11093

Merged
merged 32 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8d58247
Add REST Catalog tests to Spark 3.5 integration test
Sep 6, 2024
aad8b27
Rebase & spotless
Sep 18, 2024
18d7c8a
code format
Oct 10, 2024
b82ca03
unneeded change
Oct 11, 2024
89a6a1d
unneeded change
Oct 11, 2024
00d9fcd
Revert "unneeded change"
Oct 11, 2024
5010a71
code format
Oct 11, 2024
8ec1929
Use in-mem config to configure RCK
Oct 14, 2024
c07b005
Update open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTCat…
nastra Oct 23, 2024
0a613f6
Use RESTServerExtension
Oct 29, 2024
c914475
check style and test failure
Oct 29, 2024
d18c3fa
test failure
Oct 29, 2024
82cca8b
fix test
Oct 29, 2024
4672381
fix test
Oct 29, 2024
c1db1f9
spotless
Oct 29, 2024
de4be14
Update open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTCat…
haizhou-zhao Oct 30, 2024
4e2dd03
Update open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTCat…
haizhou-zhao Oct 30, 2024
c549eb3
Update spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBa…
haizhou-zhao Oct 30, 2024
043cd83
Spotless and fix test
Oct 30, 2024
b854759
Apply suggestions from code review
nastra Oct 31, 2024
bf0dd18
Apply suggestions from code review
nastra Oct 31, 2024
b729f4f
Apply suggestions from code review
nastra Oct 31, 2024
45b76f4
Update spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBa…
nastra Oct 31, 2024
c9f06b9
Package protected RCKUtils
Oct 31, 2024
2f1ad6c
spotless
Oct 31, 2024
a23bc7f
unintentional change
Oct 31, 2024
eb1a345
remove warehouse specification from rest
Oct 31, 2024
048f037
spotless
Oct 31, 2024
bac5d17
move find free port to rest server extension
Nov 14, 2024
85655d8
fix typo
Nov 14, 2024
0ef2c2b
checkstyle
Nov 14, 2024
2d31dc0
fix unit test
Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.iceberg.rest;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -77,14 +80,21 @@ static Map<String, String> environmentCatalogConfig() {
}

static RESTCatalog initCatalogClient() {
return initCatalogClient(Maps.newHashMap());
}

static RESTCatalog initCatalogClient(Map<String, String> properties) {
Map<String, String> catalogProperties = Maps.newHashMap();
catalogProperties.putAll(RCKUtils.environmentCatalogConfig());
catalogProperties.putAll(Maps.fromProperties(System.getProperties()));
catalogProperties.putAll(properties);

// Set defaults
String port =
catalogProperties.getOrDefault(
RESTCatalogServer.REST_PORT, String.valueOf(RESTCatalogServer.REST_PORT_DEFAULT));
catalogProperties.putIfAbsent(
CatalogProperties.URI,
String.format("http://localhost:%s/", RESTCatalogServer.REST_PORT_DEFAULT));
CatalogProperties.URI, String.format("http://localhost:%s/", port));
catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION, "rck_warehouse");

RESTCatalog catalog = new RESTCatalog();
Expand All @@ -107,4 +117,12 @@ static void purgeCatalogTestEntries(RESTCatalog catalog) {
catalog.dropNamespace(namespace);
});
}

static int findFreePort() {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
Expand All @@ -37,12 +38,19 @@
public class RESTCatalogServer {
private static final Logger LOG = LoggerFactory.getLogger(RESTCatalogServer.class);

static final String REST_PORT = "rest.port";
public static final String REST_PORT = "rest.port";
static final int REST_PORT_DEFAULT = 8181;

private Server httpServer;
private final Map<String, String> config;

RESTCatalogServer() {}
RESTCatalogServer() {
this.config = Maps.newHashMap();
}

RESTCatalogServer(Map<String, String> config) {
this.config = config;
}

static class CatalogContext {
private final Catalog catalog;
Expand All @@ -64,7 +72,8 @@ public Map<String, String> configuration() {

private CatalogContext initializeBackendCatalog() throws IOException {
// Translate environment variables to catalog properties
Map<String, String> catalogProperties = RCKUtils.environmentCatalogConfig();
Map<String, String> catalogProperties = Maps.newHashMap(RCKUtils.environmentCatalogConfig());
catalogProperties.putAll(config);

// Fallback to a JDBCCatalog impl if one is not set
catalogProperties.putIfAbsent(CatalogProperties.CATALOG_IMPL, JdbcCatalog.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,49 @@
*/
package org.apache.iceberg.rest;

import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

public class RESTServerExtension implements BeforeAllCallback, AfterAllCallback {
// if the caller explicitly wants the server to start on port 0, it means the caller wants to
// launch on a free port
public static final String FREE_PORT = "0";

private RESTCatalogServer localServer;
private RESTCatalog client;
private final Map<String, String> config;

public RESTServerExtension() {
config = Maps.newHashMap();
}

public RESTServerExtension(Map<String, String> config) {
Map<String, String> conf = Maps.newHashMap(config);
if (conf.containsKey(RESTCatalogServer.REST_PORT)
&& conf.get(RESTCatalogServer.REST_PORT).equals(FREE_PORT)) {
conf.put(RESTCatalogServer.REST_PORT, String.valueOf(RCKUtils.findFreePort()));
}
this.config = conf;
}

public Map<String, String> config() {
return config;
}

public RESTCatalog client() {
return client;
}

@Override
public void beforeAll(ExtensionContext extensionContext) throws Exception {
if (Boolean.parseBoolean(
extensionContext.getConfigurationParameter(RCKUtils.RCK_LOCAL).orElse("true"))) {
this.localServer = new RESTCatalogServer();
this.localServer = new RESTCatalogServer(config);
this.localServer.start(false);
this.client = RCKUtils.initCatalogClient(config);
}
}

Expand All @@ -39,5 +69,8 @@ public void afterAll(ExtensionContext extensionContext) throws Exception {
if (localServer != null) {
localServer.stop();
}
if (client != null) {
client.close();
}
}
}
22 changes: 22 additions & 0 deletions spark/v3.5/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,13 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts')
testImplementation (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) {
transitive = false
}
testImplementation libs.sqlite.jdbc
testImplementation libs.awaitility
// runtime dependencies for running REST Catalog based integration test
testRuntimeOnly libs.jetty.servlet
}

test {
Expand Down Expand Up @@ -172,6 +177,12 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
testImplementation (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) {
transitive = false
}
// runtime dependencies for running REST Catalog based integration test
testRuntimeOnly libs.jetty.servlet
testRuntimeOnly libs.sqlite.jdbc

testImplementation libs.avro.avro
testImplementation libs.parquet.hadoop
Expand Down Expand Up @@ -255,6 +266,17 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')

// runtime dependencies for running Hive Catalog based integration test
integrationRuntimeOnly project(':iceberg-hive-metastore')
// runtime dependencies for running REST Catalog based integration test
integrationRuntimeOnly project(path: ':iceberg-core', configuration: 'testArtifacts')
integrationRuntimeOnly (project(path: ':iceberg-open-api', configuration: 'testFixturesRuntimeElements')) {
transitive = false
}
integrationRuntimeOnly libs.jetty.servlet
integrationRuntimeOnly libs.sqlite.jdbc

// Not allowed on our classpath, only the runtime jar is allowed
integrationCompileOnly project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}")
integrationCompileOnly project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.IOException;
import java.util.Comparator;
Expand Down Expand Up @@ -521,7 +524,7 @@ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
optional(3, "category", Types.StringType.get())));

spark.createDataFrame(newRecords, newSparkSchema).coalesce(1).writeTo(tableName).append();

table.refresh();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this refresh only needed for REST?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, RESTTableOperations and other TableOperations has different mechanisms of refreshing metadata.

Long currentSnapshotId = table.currentSnapshot().snapshotId();

Dataset<Row> actualFilesDs =
Expand Down Expand Up @@ -740,6 +743,11 @@ private boolean partitionMatch(Record file, String partValue) {

@TestTemplate
public void metadataLogEntriesAfterReplacingTable() throws Exception {
assumeThat(catalogConfig.get(ICEBERG_CATALOG_TYPE))
.as(
"need to fix https://github.com/apache/iceberg/issues/11109 before enabling this for the REST catalog")
.isNotEqualTo(ICEBERG_CATALOG_TYPE_REST);

sql(
"CREATE TABLE %s (id bigint, data string) "
+ "USING iceberg "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,14 @@ public void testRemoveOrphanFilesWithStatisticFiles() throws Exception {
Table table = Spark3Util.loadIcebergTable(spark, tableName);

String statsFileName = "stats-file-" + UUID.randomUUID();
String location = table.location();
// not every catalog will return file proto for local directories
// i.e. Hadoop and Hive Catalog do, Jdbc and REST do not
if (!location.startsWith("file:")) {
location = "file:" + location;
}
File statsLocation =
new File(new URI(table.location()))
.toPath()
.resolve("data")
.resolve(statsFileName)
.toFile();
new File(new URI(location)).toPath().resolve("data").resolve(statsFileName).toFile();
StatisticsFile statisticsFile;
try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) {
long snapshotId = table.currentSnapshot().snapshotId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.iceberg.spark;

import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(ParameterizedTestExtension.class)
Expand All @@ -43,6 +45,14 @@ protected static Object[][] parameters() {
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties()
},
{
SparkCatalogConfig.REST.catalogName(),
SparkCatalogConfig.REST.implementation(),
ImmutableMap.builder()
.putAll(SparkCatalogConfig.REST.properties())
.put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI))
.build()
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public enum SparkCatalogConfig {
"testhadoop",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "hadoop", "cache-enabled", "false")),
REST(
"testrest",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "rest", "cache-enabled", "false")),
SPARK(
"spark_catalog",
SparkSessionCatalog.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
*/
package org.apache.iceberg.spark;

import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE;
import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
Expand All @@ -36,17 +41,38 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.RESTCatalogServer;
import org.apache.iceberg.rest.RESTServerExtension;
import org.apache.iceberg.util.PropertyUtil;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(ParameterizedTestExtension.class)
public abstract class TestBaseWithCatalog extends TestBase {
protected static File warehouse = null;

@RegisterExtension
private static final RESTServerExtension REST_SERVER_EXTENSION =
new RESTServerExtension(
Map.of(
RESTCatalogServer.REST_PORT,
RESTServerExtension.FREE_PORT,
// In-memory sqlite database by default is private to the connection that created it.
// If more than 1 jdbc connection backed by in-memory sqlite is created behind one
// JdbcCatalog, then different jdbc connections could provide different views of table
// status even belonging to the same catalog. Reference:
// https://www.sqlite.org/inmemorydb.html
CatalogProperties.CLIENT_POOL_SIZE,
"1"));

protected static RESTCatalog restCatalog;

@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
protected static Object[][] parameters() {
return new Object[][] {
Expand All @@ -59,13 +85,14 @@ protected static Object[][] parameters() {
}

@BeforeAll
public static void createWarehouse() throws IOException {
public static void setUpAll() throws IOException {
TestBaseWithCatalog.warehouse = File.createTempFile("warehouse", null);
assertThat(warehouse.delete()).isTrue();
restCatalog = REST_SERVER_EXTENSION.client();
Copy link
Contributor

@danielcweeks danielcweeks Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we're almost there, but we're still doing too much here that should be one in the extension.

We should make this test base implement either the BeforeAllCallback or the BeforeEachCallback which allows for using the extension store:

  @Override
  public void beforeAll(ExtensionContext extensionContext) {
    restCatalog = (RESTCatalog) extensionContext.getStore(ExtensionContext.Namespace.create("rest")).get("rest-catalog");
  }

This will actually get called before the @BeforeAll annotation, so we can capture what we need to from the context store and configure for the rest of the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you were to use the BeforeEachCallback interface, you could fully replace the static restCatalog reference and just pull from the context when the validating catalog is rest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, the biggest issue right now to incorporate rest client init into BeforeEach call are the fact that

  1. REST server&client (and all their configurations) are initialized in Extension, not in the test class itself
  2. REST server (port) is dynamically assigned, but that information is needed to initialize Spark session in a static context

For 1, the current work around is to give the extension class a getter method so that the test class can obtain necessary information.

For 2, as an example, the existing test Hive Metastore server for spark tests also has its port dynamically assigned. The existing approach is that the test Hive Metastore will be initialized in the test class itself in its BeforeAll call, so that the server hiveConf could be used to initialize Spark session in later steps of the BeforeAll calls. Looking back at rest client, if we don't refactor the Spark session to be initialized in BeforeEach stage, then it's unlikely we can delay the init of rest client to BeforeEach stage. Meanwhile, init Spark session at BeforeEach stage (every test has its own Spark session spin up) would be quite expensive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking, the other way of refactoring this is we can initialize Spark session in a BeforeAllCallBack extension class (and that perhaps also mean we need to refactor the existing HiveCatalog & HadoopCatalog initialization into separate Extension classes to take care of the existing Spark tests).

Like mentioned before, as long as we are still initializing Spark session in BeforeAll stage, we unlikely can delay init of catalogs to BeforeEach stage.

Yet if the Spark session is also init in a separate extension class, we might not need to write getters on the extension class for catalogs to expose anything related to catalogs to the outside world (info can pass freely between extension classes) - we only need to write getters for Spark extension class so that the test classes themselves can get a handle to the Spark session.

}

@AfterAll
public static void dropWarehouse() throws IOException {
public static void tearDownAll() throws IOException {
if (warehouse != null && warehouse.exists()) {
Path warehousePath = new Path(warehouse.getAbsolutePath());
FileSystem fs = warehousePath.getFileSystem(hiveConf);
Expand All @@ -89,13 +116,37 @@ public static void dropWarehouse() throws IOException {
protected TableIdentifier tableIdent = TableIdentifier.of(Namespace.of("default"), "table");
protected String tableName;

private void configureValidationCatalog() {
if (catalogConfig.containsKey(ICEBERG_CATALOG_TYPE)) {
switch (catalogConfig.get(ICEBERG_CATALOG_TYPE)) {
case ICEBERG_CATALOG_TYPE_HADOOP:
this.validationCatalog =
new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse);
break;
case ICEBERG_CATALOG_TYPE_REST:
this.validationCatalog = restCatalog;
break;
case ICEBERG_CATALOG_TYPE_HIVE:
this.validationCatalog = catalog;
break;
default:
throw new IllegalArgumentException("Unknown catalog type");
}
} else if (catalogConfig.containsKey(CATALOG_IMPL)) {
switch (catalogConfig.get(CATALOG_IMPL)) {
case "org.apache.iceberg.inmemory.InMemoryCatalog":
this.validationCatalog = new InMemoryCatalog();
break;
default:
throw new IllegalArgumentException("Unknown catalog impl");
}
}
this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
}

@BeforeEach
public void before() {
this.validationCatalog =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if we don't do any changes to how the validation catalog is configured?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like you created a table on REST catalog, while validate against Hive catalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my point is that I think all tests should be passing when we don't do any changes to the validation catalog

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the original validationCatalog is set to a HadoopCatalog if catalogName of the catalog being tested is named testhadoop; otherwise, make the validationCatalog the same as catalog (which is strictly a HiveCatalog, as defined by TestBase class).

That will work in the old days, as we only have 2 types catalogs, either Hadoop or Hive, being tested - if the test subject is not a Hadoop Catalog, then setting the validation catalog to Hive Catalog will suffice the validation purpose. However, with the introduction of a 3rd type, REST catalog: when the test subject is a REST catalog, without changing how validationCatalog is initialized, the validationCatalog will be set to a Hive catalog. In this case, conducting test behaviors on REST catalog while validating the status post-change on Hive catalog won't work.

Unless, you are suggesting that we should make changes to TestBase class where the catalog being tested does not strictly need to be a HiveCatalog, and can be any type of catalog.

catalogName.equals("testhadoop")
? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse)
: catalog;
this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
configureValidationCatalog();

spark.conf().set("spark.sql.catalog." + catalogName, implementation);
catalogConfig.forEach(
Expand Down
Loading