From cb35151ed206348f641dc7e6b70b3a4890399a56 Mon Sep 17 00:00:00 2001 From: tomtongue Date: Mon, 12 May 2025 18:23:22 +0900 Subject: [PATCH] Migrate Spark3.4 SparkTestBase related tests --- .../apache/iceberg/spark/SparkTestBase.java | 287 ------------------ .../spark/source/TestChangelogReader.java | 32 +- .../source/TestIdentityPartitionData.java | 120 +++++--- .../source/TestSparkMetadataColumns.java | 96 +++--- .../source/TestTimestampWithoutZone.java | 70 +++-- .../spark/source/TestChangelogReader.java | 2 +- .../source/TestIdentityPartitionData.java | 15 +- .../source/TestTimestampWithoutZone.java | 10 +- 8 files changed, 197 insertions(+), 435 deletions(-) delete mode 100644 spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java deleted file mode 100644 index 3e8953fb950c..000000000000 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.spark; - -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.URI; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.ContentFile; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.hive.HiveCatalog; -import org.apache.iceberg.hive.TestHiveMetastore; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; -import org.apache.spark.sql.execution.QueryExecution; -import org.apache.spark.sql.execution.SparkPlan; -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec; -import org.apache.spark.sql.internal.SQLConf; -import org.apache.spark.sql.util.QueryExecutionListener; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; - -public abstract class SparkTestBase extends SparkTestHelperBase { - - protected static TestHiveMetastore metastore = null; - protected static HiveConf hiveConf = null; - protected static SparkSession spark = null; - protected static JavaSparkContext sparkContext = null; - protected static HiveCatalog catalog = null; - - @BeforeClass - public static void startMetastoreAndSpark() { - SparkTestBase.metastore = new TestHiveMetastore(); - metastore.start(); - SparkTestBase.hiveConf = metastore.hiveConf(); - - SparkTestBase.spark = - SparkSession.builder() - .master("local[2]") - .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") - .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) - .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") - .enableHiveSupport() - .getOrCreate(); - - SparkTestBase.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - SparkTestBase.catalog = - (HiveCatalog) - CatalogUtil.loadCatalog( - HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf); - - try { - catalog.createNamespace(Namespace.of("default")); - } catch (AlreadyExistsException ignored) { - // the default namespace already exists. ignore the create error - } - } - - @AfterClass - public static void stopMetastoreAndSpark() throws Exception { - SparkTestBase.catalog = null; - if (metastore != null) { - metastore.stop(); - SparkTestBase.metastore = null; - } - if (spark != null) { - spark.stop(); - SparkTestBase.spark = null; - SparkTestBase.sparkContext = null; - } - } - - protected long waitUntilAfter(long timestampMillis) { - long current = System.currentTimeMillis(); - while (current <= timestampMillis) { - current = System.currentTimeMillis(); - } - return current; - } - - protected List sql(String query, Object... args) { - List rows = spark.sql(String.format(query, args)).collectAsList(); - if (rows.size() < 1) { - return ImmutableList.of(); - } - - return rowsToJava(rows); - } - - protected Object scalarSql(String query, Object... args) { - List rows = sql(query, args); - Assert.assertEquals("Scalar SQL should return one row", 1, rows.size()); - Object[] row = Iterables.getOnlyElement(rows); - Assert.assertEquals("Scalar SQL should return one value", 1, row.length); - return row[0]; - } - - protected Object[] row(Object... values) { - return values; - } - - protected static String dbPath(String dbName) { - return metastore.getDatabasePath(dbName); - } - - protected void withUnavailableFiles(Iterable> files, Action action) { - Iterable fileLocations = Iterables.transform(files, ContentFile::location); - withUnavailableLocations(fileLocations, action); - } - - private void move(String location, String newLocation) { - Path path = Paths.get(URI.create(location)); - Path tempPath = Paths.get(URI.create(newLocation)); - - try { - Files.move(path, tempPath); - } catch (IOException e) { - throw new UncheckedIOException("Failed to move: " + location, e); - } - } - - protected void withUnavailableLocations(Iterable locations, Action action) { - for (String location : locations) { - move(location, location + "_temp"); - } - - try { - action.invoke(); - } finally { - for (String location : locations) { - move(location + "_temp", location); - } - } - } - - protected void withDefaultTimeZone(String zoneId, Action action) { - TimeZone currentZone = TimeZone.getDefault(); - try { - TimeZone.setDefault(TimeZone.getTimeZone(zoneId)); - action.invoke(); - } finally { - TimeZone.setDefault(currentZone); - } - } - - protected void withSQLConf(Map conf, Action action) { - SQLConf sqlConf = SQLConf.get(); - - Map currentConfValues = Maps.newHashMap(); - conf.keySet() - .forEach( - confKey -> { - if (sqlConf.contains(confKey)) { - String currentConfValue = sqlConf.getConfString(confKey); - currentConfValues.put(confKey, currentConfValue); - } - }); - - conf.forEach( - (confKey, confValue) -> { - if (SQLConf.isStaticConfigKey(confKey)) { - throw new RuntimeException("Cannot modify the value of a static config: " + confKey); - } - sqlConf.setConfString(confKey, confValue); - }); - - try { - action.invoke(); - } finally { - conf.forEach( - (confKey, confValue) -> { - if (currentConfValues.containsKey(confKey)) { - sqlConf.setConfString(confKey, currentConfValues.get(confKey)); - } else { - sqlConf.unsetConf(confKey); - } - }); - } - } - - protected Dataset jsonToDF(String schema, String... records) { - Dataset jsonDF = spark.createDataset(ImmutableList.copyOf(records), Encoders.STRING()); - return spark.read().schema(schema).json(jsonDF); - } - - protected void append(String table, String... jsonRecords) { - try { - String schema = spark.table(table).schema().toDDL(); - Dataset df = jsonToDF(schema, jsonRecords); - df.coalesce(1).writeTo(table).append(); - } catch (NoSuchTableException e) { - throw new RuntimeException("Failed to write data", e); - } - } - - protected String tablePropsAsString(Map tableProps) { - StringBuilder stringBuilder = new StringBuilder(); - - for (Map.Entry property : tableProps.entrySet()) { - if (stringBuilder.length() > 0) { - stringBuilder.append(", "); - } - stringBuilder.append(String.format("'%s' '%s'", property.getKey(), property.getValue())); - } - - return stringBuilder.toString(); - } - - protected SparkPlan executeAndKeepPlan(String query, Object... args) { - return executeAndKeepPlan(() -> sql(query, args)); - } - - protected SparkPlan executeAndKeepPlan(Action action) { - AtomicReference executedPlanRef = new AtomicReference<>(); - - QueryExecutionListener listener = - new QueryExecutionListener() { - @Override - public void onSuccess(String funcName, QueryExecution qe, long durationNs) { - executedPlanRef.set(qe.executedPlan()); - } - - @Override - public void onFailure(String funcName, QueryExecution qe, Exception exception) {} - }; - - spark.listenerManager().register(listener); - - action.invoke(); - - try { - spark.sparkContext().listenerBus().waitUntilEmpty(); - } catch (TimeoutException e) { - throw new RuntimeException("Timeout while waiting for processing events", e); - } - - SparkPlan executedPlan = executedPlanRef.get(); - if (executedPlan instanceof AdaptiveSparkPlanExec) { - return ((AdaptiveSparkPlanExec) executedPlan).executedPlan(); - } else { - return executedPlan; - } - } - - @FunctionalInterface - protected interface Action { - void invoke(); - } -} diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java index fc17547fad41..803603a9e3d9 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java @@ -20,8 +20,11 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.util.List; import java.util.stream.Collectors; import org.apache.iceberg.ChangelogOperation; @@ -41,17 +44,15 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.TestBase; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -public class TestChangelogReader extends SparkTestBase { +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestChangelogReader extends TestBase { private static final Schema SCHEMA = new Schema( required(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get())); @@ -64,9 +65,9 @@ public class TestChangelogReader extends SparkTestBase { private DataFile dataFile1; private DataFile dataFile2; - @Rule public TemporaryFolder temp = new TemporaryFolder(); + @TempDir private Path temp; - @Before + @BeforeEach public void before() throws IOException { table = catalog.createTable(TableIdentifier.of("default", "test"), SCHEMA, SPEC); // create some data @@ -85,7 +86,7 @@ public void before() throws IOException { dataFile2 = writeDataFile(records2); } - @After + @AfterEach public void after() { catalog.dropTable(TableIdentifier.of("default", "test")); } @@ -176,7 +177,7 @@ public void testDataFileRewrite() throws IOException { reader.close(); } - Assert.assertEquals("Should have no rows", 0, rows.size()); + assertThat(rows).as("Should have no rows").isEmpty(); } @Test @@ -254,6 +255,9 @@ private Object[] toJava(InternalRow row) { private DataFile writeDataFile(List records) throws IOException { // records all use IDs that are in bucket id_bucket=0 return FileHelpers.writeDataFile( - table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), records); + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + records); } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java index 4ee77345dbe5..bf3bcacbfbe9 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java @@ -20,14 +20,20 @@ import static org.apache.iceberg.PlanningMode.DISTRIBUTED; import static org.apache.iceberg.PlanningMode.LOCAL; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PlanningMode; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -37,48 +43,75 @@ import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkTableUtil; -import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.TestBase; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.TableIdentifier; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestIdentityPartitionData extends SparkTestBase { +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestIdentityPartitionData extends TestBase { private static final Configuration CONF = new Configuration(); private static final HadoopTables TABLES = new HadoopTables(CONF); - @Parameterized.Parameters(name = "format = {0}, vectorized = {1}, planningMode = {2}") + @Parameters(name = "format = {0}, vectorized = {1}, properties = {2}") public static Object[][] parameters() { return new Object[][] { - {"parquet", false, LOCAL}, - {"parquet", true, DISTRIBUTED}, - {"avro", false, LOCAL}, - {"orc", false, DISTRIBUTED}, - {"orc", true, LOCAL}, + { + FileFormat.PARQUET, + false, + ImmutableMap.of( + TableProperties.DEFAULT_FILE_FORMAT, "parquet", + TableProperties.DATA_PLANNING_MODE, LOCAL.modeName(), + TableProperties.DELETE_PLANNING_MODE, LOCAL.modeName()) + }, + { + FileFormat.PARQUET, + true, + ImmutableMap.of( + TableProperties.DEFAULT_FILE_FORMAT, "parquet", + TableProperties.DATA_PLANNING_MODE, DISTRIBUTED.modeName(), + TableProperties.DELETE_PLANNING_MODE, DISTRIBUTED.modeName()) + }, + { + FileFormat.AVRO, + false, + ImmutableMap.of( + TableProperties.DEFAULT_FILE_FORMAT, "avro", + TableProperties.DATA_PLANNING_MODE, LOCAL.modeName(), + TableProperties.DELETE_PLANNING_MODE, LOCAL.modeName()) + }, + { + FileFormat.ORC, + false, + ImmutableMap.of( + TableProperties.DEFAULT_FILE_FORMAT, "orc", + TableProperties.DATA_PLANNING_MODE, DISTRIBUTED.modeName(), + TableProperties.DELETE_PLANNING_MODE, DISTRIBUTED.modeName()) + }, + { + FileFormat.ORC, + true, + ImmutableMap.of( + TableProperties.DEFAULT_FILE_FORMAT, "orc", + TableProperties.DATA_PLANNING_MODE, LOCAL.modeName(), + TableProperties.DELETE_PLANNING_MODE, LOCAL.modeName()) + }, }; } - private final String format; - private final boolean vectorized; - private final Map properties; + @Parameter(index = 0) + private FileFormat format; - public TestIdentityPartitionData(String format, boolean vectorized, PlanningMode planningMode) { - this.format = format; - this.vectorized = vectorized; - this.properties = - ImmutableMap.of( - TableProperties.DEFAULT_FILE_FORMAT, format, - TableProperties.DATA_PLANNING_MODE, planningMode.modeName(), - TableProperties.DELETE_PLANNING_MODE, planningMode.modeName()); - } + @Parameter(index = 1) + private boolean vectorized; + + @Parameter(index = 2) + private Map properties; private static final Schema LOG_SCHEMA = new Schema( @@ -100,7 +133,7 @@ public TestIdentityPartitionData(String format, boolean vectorized, PlanningMode LogMessage.warn("2020-02-04", "warn event 1"), LogMessage.debug("2020-02-04", "debug event 5")); - @Rule public TemporaryFolder temp = new TemporaryFolder(); + @TempDir private Path temp; private final PartitionSpec spec = PartitionSpec.builderFor(LOG_SCHEMA).identity("date").identity("level").build(); @@ -113,10 +146,10 @@ public TestIdentityPartitionData(String format, boolean vectorized, PlanningMode * also fail. */ private void setupParquet() throws Exception { - File location = temp.newFolder("logs"); - File hiveLocation = temp.newFolder("hive"); + File location = Files.createTempDirectory(temp, "logs").toFile(); + File hiveLocation = Files.createTempDirectory(temp, "hive").toFile(); String hiveTable = "hivetable"; - Assert.assertTrue("Temp folder should exist", location.exists()); + assertThat(location).as("Temp folder should exist").exists(); this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", "level", "message"); @@ -139,13 +172,13 @@ private void setupParquet() throws Exception { spark, new TableIdentifier(hiveTable), table, location.toString()); } - @Before + @BeforeEach public void setupTable() throws Exception { - if (format.equals("parquet")) { + if (format.equals(FileFormat.PARQUET)) { setupParquet(); } else { - File location = temp.newFolder("logs"); - Assert.assertTrue("Temp folder should exist", location.exists()); + File location = Files.createTempDirectory(temp, "logs").toFile(); + assertThat(location).as("Temp folder should exist").exists(); this.table = TABLES.create(LOG_SCHEMA, spec, properties, location.toString()); this.logs = @@ -159,7 +192,7 @@ public void setupTable() throws Exception { } } - @Test + @TestTemplate public void testFullProjection() { List expected = logs.orderBy("id").collectAsList(); List actual = @@ -171,10 +204,10 @@ public void testFullProjection() { .orderBy("id") .select("id", "date", "level", "message") .collectAsList(); - Assert.assertEquals("Rows should match", expected, actual); + assertThat(actual).as("Rows should match").isEqualTo(expected); } - @Test + @TestTemplate public void testProjections() { String[][] cases = new String[][] { @@ -210,8 +243,9 @@ public void testProjections() { .select("id", ordering) .orderBy("id") .collectAsList(); - Assert.assertEquals( - "Rows should match for ordering: " + Arrays.toString(ordering), expected, actual); + assertThat(actual) + .as("Rows should match for ordering: " + Arrays.toString(ordering)) + .isEqualTo(expected); } } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java index 1fd017025e58..4fd018149365 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java @@ -26,9 +26,13 @@ import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED; import static org.apache.spark.sql.functions.expr; import static org.apache.spark.sql.functions.lit; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -36,6 +40,9 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -49,26 +56,22 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.TestBase; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.types.StructType; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestSparkMetadataColumns extends SparkTestBase { +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestSparkMetadataColumns extends TestBase { private static final String TABLE_NAME = "test_table"; private static final Schema SCHEMA = @@ -84,37 +87,41 @@ public class TestSparkMetadataColumns extends SparkTestBase { .addField("zero", 1, "id_zero") .build(); - @Parameterized.Parameters(name = "fileFormat = {0}, vectorized = {1}, formatVersion = {2}") + @Parameters(name = "fileFormat = {0}, vectorized = {1}, formatVersion = {2}") public static Object[][] parameters() { return new Object[][] { {FileFormat.PARQUET, false, 1}, {FileFormat.PARQUET, true, 1}, {FileFormat.PARQUET, false, 2}, {FileFormat.PARQUET, true, 2}, + {FileFormat.PARQUET, false, 3}, + {FileFormat.PARQUET, true, 3}, {FileFormat.AVRO, false, 1}, {FileFormat.AVRO, false, 2}, + {FileFormat.AVRO, false, 3}, {FileFormat.ORC, false, 1}, {FileFormat.ORC, true, 1}, {FileFormat.ORC, false, 2}, {FileFormat.ORC, true, 2}, + {FileFormat.ORC, false, 3}, + {FileFormat.ORC, true, 3}, }; } - @Rule public TemporaryFolder temp = new TemporaryFolder(); + @TempDir private Path temp; - private final FileFormat fileFormat; - private final boolean vectorized; - private final int formatVersion; + @Parameter(index = 0) + private FileFormat fileFormat; - private Table table = null; + @Parameter(index = 1) + private boolean vectorized; - public TestSparkMetadataColumns(FileFormat fileFormat, boolean vectorized, int formatVersion) { - this.fileFormat = fileFormat; - this.vectorized = vectorized; - this.formatVersion = formatVersion; - } + @Parameter(index = 2) + private int formatVersion; + + private Table table = null; - @BeforeClass + @BeforeAll public static void setupSpark() { ImmutableMap config = ImmutableMap.of( @@ -128,20 +135,21 @@ public static void setupSpark() { (key, value) -> spark.conf().set("spark.sql.catalog.spark_catalog." + key, value)); } - @Before + @BeforeEach public void setupTable() throws IOException { createAndInitTable(); } - @After + @AfterEach public void dropTable() { TestTables.clearTables(); } - @Test + @TestTemplate public void testSpecAndPartitionMetadataColumns() { // TODO: support metadata structs in vectorized ORC reads - Assume.assumeFalse(fileFormat == FileFormat.ORC && vectorized); + assumeThat(fileFormat).isNotEqualTo(FileFormat.ORC); + assumeThat(vectorized).isFalse(); sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME); @@ -172,7 +180,7 @@ public void testSpecAndPartitionMetadataColumns() { sql("SELECT _spec_id, _partition FROM %s ORDER BY _spec_id", TABLE_NAME)); } - @Test + @TestTemplate public void testPartitionMetadataColumnWithManyColumns() { List fields = Lists.newArrayList(Types.NestedField.required(0, "id", Types.LongType.get())); @@ -204,7 +212,7 @@ public void testPartitionMetadataColumnWithManyColumns() { .mode("append") .save(TABLE_NAME); - Assert.assertEquals(2, spark.table(TABLE_NAME).select("*", "_partition").count()); + assertThat(spark.table(TABLE_NAME).select("*", "_partition").count()).isEqualTo(2); List expected = ImmutableList.of(row(row(0L), 0L, "0", "0", "0"), row(row(1L), 1L, "1", "1", "1")); assertEquals( @@ -213,9 +221,9 @@ public void testPartitionMetadataColumnWithManyColumns() { sql("SELECT _partition, id, c999, c1000, c1001 FROM %s ORDER BY id", TABLE_NAME)); } - @Test + @TestTemplate public void testPositionMetadataColumnWithMultipleRowGroups() throws NoSuchTableException { - Assume.assumeTrue(fileFormat == FileFormat.PARQUET); + assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET); table.updateProperties().set(PARQUET_ROW_GROUP_SIZE_BYTES, "100").commit(); @@ -231,15 +239,15 @@ public void testPositionMetadataColumnWithMultipleRowGroups() throws NoSuchTable .withColumn("data", lit("ABCDEF")); df.coalesce(1).writeTo(TABLE_NAME).append(); - Assert.assertEquals(200, spark.table(TABLE_NAME).count()); + assertThat(spark.table(TABLE_NAME).count()).isEqualTo(200); List expectedRows = ids.stream().map(this::row).collect(Collectors.toList()); assertEquals("Rows must match", expectedRows, sql("SELECT _pos FROM %s", TABLE_NAME)); } - @Test + @TestTemplate public void testPositionMetadataColumnWithMultipleBatches() throws NoSuchTableException { - Assume.assumeTrue(fileFormat == FileFormat.PARQUET); + assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET); table.updateProperties().set(PARQUET_BATCH_SIZE, "1000").commit(); @@ -255,13 +263,13 @@ public void testPositionMetadataColumnWithMultipleBatches() throws NoSuchTableEx .withColumn("data", lit("ABCDEF")); df.coalesce(1).writeTo(TABLE_NAME).append(); - Assert.assertEquals(7500, spark.table(TABLE_NAME).count()); + assertThat(spark.table(TABLE_NAME).count()).isEqualTo(7500); List expectedRows = ids.stream().map(this::row).collect(Collectors.toList()); assertEquals("Rows must match", expectedRows, sql("SELECT _pos FROM %s", TABLE_NAME)); } - @Test + @TestTemplate public void testPartitionMetadataColumnWithUnknownTransforms() { // replace the table spec to include an unknown transform TableOperations ops = ((HasTableOperations) table).operations(); @@ -273,7 +281,7 @@ public void testPartitionMetadataColumnWithUnknownTransforms() { .hasMessage("Cannot build table partition type, unknown transforms: [zero]"); } - @Test + @TestTemplate public void testConflictingColumns() { table .updateSchema() @@ -325,6 +333,12 @@ private void createAndInitTable() throws IOException { !vectorized, "File format %s does not support vectorized reads", fileFormat); } - this.table = TestTables.create(temp.newFolder(), TABLE_NAME, SCHEMA, SPEC, properties); + this.table = + TestTables.create( + Files.createTempDirectory(temp, "junit").toFile(), + TABLE_NAME, + SCHEMA, + SPEC, + properties); } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java index ac674e2e62e8..8b1e3fbfc77c 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -19,9 +19,11 @@ package org.apache.iceberg.spark.source; import static org.apache.iceberg.Files.localOutput; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.time.LocalDateTime; import java.util.List; import java.util.UUID; @@ -31,6 +33,9 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -41,25 +46,22 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadOptions; -import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.TestBase; import org.apache.iceberg.spark.data.GenericsHelpers; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestTimestampWithoutZone extends SparkTestBase { +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestTimestampWithoutZone extends TestBase { private static final Configuration CONF = new Configuration(); private static final HadoopTables TABLES = new HadoopTables(CONF); @@ -71,53 +73,49 @@ public class TestTimestampWithoutZone extends SparkTestBase { private static SparkSession spark = null; - @BeforeClass + @BeforeAll public static void startSpark() { TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate(); } - @AfterClass + @AfterAll public static void stopSpark() { SparkSession currentSpark = TestTimestampWithoutZone.spark; TestTimestampWithoutZone.spark = null; currentSpark.stop(); } - @Rule public TemporaryFolder temp = new TemporaryFolder(); + @TempDir private Path temp; - private final String format; - private final boolean vectorized; + @Parameter(index = 0) + private FileFormat fileFormat; - @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + @Parameter(index = 1) + private boolean vectorized; + + @Parameters(name = "format = {0}, vectorized = {1}") public static Object[][] parameters() { return new Object[][] { - {"parquet", false}, - {"parquet", true}, - {"avro", false} + {FileFormat.PARQUET, false}, + {FileFormat.PARQUET, true}, + {FileFormat.AVRO, false} }; } - public TestTimestampWithoutZone(String format, boolean vectorized) { - this.format = format; - this.vectorized = vectorized; - } - private File parent = null; private File unpartitioned = null; private List records = null; - @Before + @BeforeEach public void writeUnpartitionedTable() throws IOException { - this.parent = temp.newFolder("TestTimestampWithoutZone"); + this.parent = temp.resolve("TestTimestampWithoutZone").toFile(); this.unpartitioned = new File(parent, "unpartitioned"); File dataFolder = new File(unpartitioned, "data"); - Assert.assertTrue("Mkdir should succeed", dataFolder.mkdirs()); + assertThat(dataFolder.mkdirs()).as("Mkdir should succeed").isTrue(); Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), unpartitioned.toString()); Schema tableSchema = table.schema(); // use the table schema because ids are reassigned - FileFormat fileFormat = FileFormat.fromString(format); - File testFile = new File(dataFolder, fileFormat.addExtension(UUID.randomUUID().toString())); // create records using the table's schema @@ -138,12 +136,12 @@ public void writeUnpartitionedTable() throws IOException { table.newAppend().appendFile(file).commit(); } - @Test + @TestTemplate public void testUnpartitionedTimestampWithoutZone() { assertEqualsSafe(SCHEMA.asStruct(), records, read(unpartitioned.toString(), vectorized)); } - @Test + @TestTemplate public void testUnpartitionedTimestampWithoutZoneProjection() { Schema projection = SCHEMA.select("id", "ts"); assertEqualsSafe( @@ -152,7 +150,7 @@ public void testUnpartitionedTimestampWithoutZoneProjection() { read(unpartitioned.toString(), vectorized, "id", "ts")); } - @Test + @TestTemplate public void testUnpartitionedTimestampWithoutZoneAppend() { spark .read() @@ -182,7 +180,7 @@ private static Record projectFlat(Schema projection, Record record) { public static void assertEqualsSafe( Types.StructType struct, List expected, List actual) { - Assert.assertEquals("Number of results should match expected", expected.size(), actual.size()); + assertThat(actual).as("Number of results should match expected").hasSameSizeAs(expected); for (int i = 0; i < expected.size(); i += 1) { GenericsHelpers.assertEqualsSafe(struct, expected.get(i), actual.get(i)); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java index 52d6ff8c9c8b..803603a9e3d9 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java @@ -177,7 +177,7 @@ public void testDataFileRewrite() throws IOException { reader.close(); } - assertThat(rows).as("Should have no rows").hasSize(0); + assertThat(rows).as("Should have no rows").isEmpty(); } @Test diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java index 35a675029c1c..bf3bcacbfbe9 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; @@ -61,7 +62,7 @@ public class TestIdentityPartitionData extends TestBase { public static Object[][] parameters() { return new Object[][] { { - "parquet", + FileFormat.PARQUET, false, ImmutableMap.of( TableProperties.DEFAULT_FILE_FORMAT, "parquet", @@ -69,7 +70,7 @@ public static Object[][] parameters() { TableProperties.DELETE_PLANNING_MODE, LOCAL.modeName()) }, { - "parquet", + FileFormat.PARQUET, true, ImmutableMap.of( TableProperties.DEFAULT_FILE_FORMAT, "parquet", @@ -77,7 +78,7 @@ public static Object[][] parameters() { TableProperties.DELETE_PLANNING_MODE, DISTRIBUTED.modeName()) }, { - "avro", + FileFormat.AVRO, false, ImmutableMap.of( TableProperties.DEFAULT_FILE_FORMAT, "avro", @@ -85,7 +86,7 @@ public static Object[][] parameters() { TableProperties.DELETE_PLANNING_MODE, LOCAL.modeName()) }, { - "orc", + FileFormat.ORC, false, ImmutableMap.of( TableProperties.DEFAULT_FILE_FORMAT, "orc", @@ -93,7 +94,7 @@ public static Object[][] parameters() { TableProperties.DELETE_PLANNING_MODE, DISTRIBUTED.modeName()) }, { - "orc", + FileFormat.ORC, true, ImmutableMap.of( TableProperties.DEFAULT_FILE_FORMAT, "orc", @@ -104,7 +105,7 @@ public static Object[][] parameters() { } @Parameter(index = 0) - private String format; + private FileFormat format; @Parameter(index = 1) private boolean vectorized; @@ -173,7 +174,7 @@ private void setupParquet() throws Exception { @BeforeEach public void setupTable() throws Exception { - if (format.equals("parquet")) { + if (format.equals(FileFormat.PARQUET)) { setupParquet(); } else { File location = Files.createTempDirectory(temp, "logs").toFile(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java index 306444b9f29f..8b1e3fbfc77c 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -88,7 +88,7 @@ public static void stopSpark() { @TempDir private Path temp; @Parameter(index = 0) - private String format; + private FileFormat fileFormat; @Parameter(index = 1) private boolean vectorized; @@ -96,9 +96,9 @@ public static void stopSpark() { @Parameters(name = "format = {0}, vectorized = {1}") public static Object[][] parameters() { return new Object[][] { - {"parquet", false}, - {"parquet", true}, - {"avro", false} + {FileFormat.PARQUET, false}, + {FileFormat.PARQUET, true}, + {FileFormat.AVRO, false} }; } @@ -116,8 +116,6 @@ public void writeUnpartitionedTable() throws IOException { Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), unpartitioned.toString()); Schema tableSchema = table.schema(); // use the table schema because ids are reassigned - FileFormat fileFormat = FileFormat.fromString(format); - File testFile = new File(dataFolder, fileFormat.addExtension(UUID.randomUUID().toString())); // create records using the table's schema