diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java similarity index 88% rename from mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java rename to mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java index 456af19ee377..4a87ede341c4 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java @@ -28,10 +28,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; @@ -43,8 +41,6 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestHelpers.Row; import org.apache.iceberg.catalog.TableIdentifier; @@ -77,10 +73,11 @@ import static org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) -public abstract class HiveIcebergStorageHandlerBaseTest { +public class TestHiveIcebergStorageHandler { + private static final FileFormat[] FILE_FORMATS = + new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, FileFormat.PARQUET}; - @Rule - public TemporaryFolder temp = new TemporaryFolder(); + private static final String[] EXECUTION_ENGINES = new String[] {"mr", "tez"}; private static final Schema CUSTOMER_SCHEMA = new Schema( optional(1, "customer_id", Types.LongType.get()), @@ -145,41 +142,47 @@ public abstract class HiveIcebergStorageHandlerBaseTest { ImmutableSet.of("bucketing_version", StatsSetupConst.ROW_COUNT, StatsSetupConst.RAW_DATA_SIZE, StatsSetupConst.TOTAL_SIZE, StatsSetupConst.NUM_FILES, "numFilesErasureCoded"); - private static TestHiveShell shell; - private static final List SUPPORTED_TYPES = ImmutableList.of(Types.BooleanType.get(), Types.IntegerType.get(), Types.LongType.get(), Types.FloatType.get(), Types.DoubleType.get(), Types.DateType.get(), Types.TimestampType.withZone(), Types.TimestampType.withoutZone(), Types.StringType.get(), Types.BinaryType.get(), Types.DecimalType.of(3, 1)); - private TestTables testTables; - - public abstract TestTables testTables(Configuration conf, TemporaryFolder tmp) throws IOException; - - @Parameters(name = "fileFormat={0}, engine={1}") + @Parameters(name = "fileFormat={0}, engine={1}, catalog={2}") public static Collection parameters() { - Collection testParams = new ArrayList<>(); - testParams.add(new Object[] { FileFormat.PARQUET, "mr" }); - testParams.add(new Object[] { FileFormat.ORC, "mr" }); - testParams.add(new Object[] { FileFormat.AVRO, "mr" }); - - // include Tez tests only for Java 8 String javaVersion = System.getProperty("java.specification.version"); - if (javaVersion.equals("1.8")) { - testParams.add(new Object[] { FileFormat.PARQUET, "tez" }); - testParams.add(new Object[] { FileFormat.ORC, "tez" }); - testParams.add(new Object[] { FileFormat.AVRO, "tez" }); + + Collection testParams = new ArrayList<>(); + for (FileFormat fileFormat : FILE_FORMATS) { + for (String engine : EXECUTION_ENGINES) { + // include Tez tests only for Java 8 + if (javaVersion.equals("1.8") || "mr".equals(engine)) { + for (TestTables.TestTableType testTableType : TestTables.ALL_TABLE_TYPES) { + testParams.add(new Object[] {fileFormat, engine, testTableType}); + } + } + } } + return testParams; } + private static TestHiveShell shell; + + private TestTables testTables; + @Parameter(0) public FileFormat fileFormat; @Parameter(1) public String executionEngine; + @Parameter(2) + public TestTables.TestTableType testTableType; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + @BeforeClass public static void beforeClass() { shell = new TestHiveShell(); @@ -196,7 +199,7 @@ public static void afterClass() { @Before public void before() throws IOException { shell.openSession(); - testTables = testTables(shell.metastore().hiveConf(), temp); + testTables = testTableType.instance(shell.metastore().hiveConf(), temp); for (Map.Entry property : testTables.properties().entrySet()) { shell.setHiveSessionValue(property.getKey(), property.getValue()); } @@ -222,7 +225,7 @@ public void after() throws Exception { @Test public void testScanEmptyTable() throws IOException { Schema emptySchema = new Schema(required(1, "empty", Types.StringType.get())); - createTable("empty", emptySchema, ImmutableList.of()); + testTables.createTable(shell, "empty", emptySchema, fileFormat, ImmutableList.of()); List rows = shell.executeStatement("SELECT * FROM default.empty"); Assert.assertEquals(0, rows.size()); @@ -230,7 +233,7 @@ public void testScanEmptyTable() throws IOException { @Test public void testScanTable() throws IOException { - createTable("customers", CUSTOMER_SCHEMA, CUSTOMER_RECORDS); + testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS); // Single fetch task: no MR job. List rows = shell.executeStatement("SELECT * FROM default.customers"); @@ -252,8 +255,8 @@ public void testScanTable() throws IOException { @Test public void testJoinTables() throws IOException { - createTable("customers", CUSTOMER_SCHEMA, CUSTOMER_RECORDS); - createTable("orders", ORDER_SCHEMA, ORDER_RECORDS); + testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS); + testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, ORDER_RECORDS); List rows = shell.executeStatement( "SELECT c.customer_id, c.first_name, o.order_id, o.total " + @@ -274,7 +277,7 @@ public void testDecimalTableWithPredicateLiterals() throws IOException { .add(new BigDecimal("100.56")) .add(new BigDecimal("100.57")) .build(); - createTable("dec_test", schema, records); + testTables.createTable(shell, "dec_test", schema, fileFormat, records); // Use integer literal in predicate List rows = shell.executeStatement("SELECT * FROM default.dec_test where decimal_field >= 85"); @@ -309,7 +312,7 @@ public void testJoinTablesSupportedTypes() throws IOException { Schema schema = new Schema(required(1, columnName, type)); List records = TestHelper.generateRandomRecords(schema, 1, 0L); - createTable(tableName, schema, records); + testTables.createTable(shell, tableName, schema, fileFormat, records); List queryResult = shell.executeStatement("select s." + columnName + ", h." + columnName + " from default." + tableName + " s join default." + tableName + " h on h." + columnName + "=s." + columnName); @@ -328,7 +331,7 @@ public void testSelectDistinctFromTable() throws IOException { Schema schema = new Schema(required(1, columnName, type)); List records = TestHelper.generateRandomRecords(schema, 4, 0L); int size = records.stream().map(r -> r.getField(columnName)).collect(Collectors.toSet()).size(); - createTable(tableName, schema, records); + testTables.createTable(shell, tableName, schema, fileFormat, records); List queryResult = shell.executeStatement("select count(distinct(" + columnName + ")) from default." + tableName); int distincIds = ((Long) queryResult.get(0)[0]).intValue(); @@ -348,7 +351,7 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc "'dummy'='test')"); // Check the Iceberg table data - org.apache.iceberg.Table icebergTable = loadTable(identifier); + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); Assert.assertEquals(IDENTITY_SPEC, icebergTable.spec()); @@ -385,7 +388,7 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc // Check if the table was really dropped even from the Catalog AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, "Table does not exist", () -> { - loadTable(identifier); + testTables.loadTable(identifier); } ); } else { @@ -404,7 +407,7 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc // Check if we drop an exception when trying to load the table AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, "Table does not exist", () -> { - loadTable(identifier); + testTables.loadTable(identifier); } ); @@ -429,7 +432,7 @@ public void testCreateTableWithoutSpec() throws TException, InterruptedException "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); // Check the Iceberg table partition data - org.apache.iceberg.Table icebergTable = loadTable(identifier); + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); Assert.assertEquals(SPEC, icebergTable.spec()); // Check the HMS table parameters @@ -461,7 +464,7 @@ public void testCreateTableWithUnpartitionedSpec() throws TException, Interrupte "'" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(SPEC) + "')"); // Check the Iceberg table partition data - org.apache.iceberg.Table icebergTable = loadTable(identifier); + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); Assert.assertEquals(SPEC, icebergTable.spec()); // Check the HMS table parameters @@ -494,7 +497,7 @@ public void testDeleteBackingTable() throws TException, IOException, Interrupted shell.executeStatement("DROP TABLE customers"); // Check if the table remains - loadTable(identifier); + testTables.loadTable(identifier); } else { // Check the HMS table parameters org.apache.hadoop.hive.metastore.api.Table hmsTable = @@ -507,7 +510,7 @@ public void testDeleteBackingTable() throws TException, IOException, Interrupted // Check if we drop an exception when trying to drop the table AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, "Table does not exist", () -> { - loadTable(identifier); + testTables.loadTable(identifier); } ); @@ -557,7 +560,8 @@ public void testCreateTableError() { @Test public void testCreateTableAboveExistingTable() throws TException, IOException, InterruptedException { // Create the Iceberg table - createIcebergTable("customers", COMPLEX_SCHEMA, Collections.emptyList()); + testTables.createIcebergTable(shell.getHiveConf(), "customers", COMPLEX_SCHEMA, FileFormat.PARQUET, + Collections.emptyList()); if (Catalogs.hiveCatalog(shell.getHiveConf())) { @@ -594,7 +598,7 @@ public void testCreateTableAboveExistingTable() throws TException, IOException, @Test public void testColumnSelection() throws IOException { - createTable("customers", CUSTOMER_SCHEMA, CUSTOMER_RECORDS); + testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS); List outOfOrderColumns = shell .executeStatement("SELECT first_name, customer_id, last_name FROM default.customers"); @@ -628,7 +632,7 @@ public void testColumnSelection() throws IOException { @Test public void selectSameColumnTwice() throws IOException { - createTable("customers", CUSTOMER_SCHEMA, CUSTOMER_RECORDS); + testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS); List columns = shell.executeStatement("SELECT first_name, first_name FROM default.customers"); @@ -647,11 +651,11 @@ public void testCreateTableWithColumnSpecification() throws IOException { testTables.locationForCreateTableSQL(identifier)); // Check the Iceberg table data - org.apache.iceberg.Table icebergTable = loadTable(identifier); + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); Assert.assertEquals(SPEC, icebergTable.spec()); - appendIcebergTable(icebergTable, fileFormat, null, CUSTOMER_RECORDS); + testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, null, CUSTOMER_RECORDS); List descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC"); @@ -684,13 +688,16 @@ public void testCreatePartitionedTable() throws IOException { "TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(spec) + "', " + "'" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); - org.apache.iceberg.Table icebergTable = loadTable(identifier); + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); Assert.assertEquals(spec, icebergTable.spec()); - appendIcebergTable(icebergTable, fileFormat, Row.of("Alice"), Arrays.asList(CUSTOMER_RECORDS.get(0))); - appendIcebergTable(icebergTable, fileFormat, Row.of("Bob"), Arrays.asList(CUSTOMER_RECORDS.get(1))); - appendIcebergTable(icebergTable, fileFormat, Row.of("Trudy"), Arrays.asList(CUSTOMER_RECORDS.get(2))); + testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, Row.of("Alice"), + Arrays.asList(CUSTOMER_RECORDS.get(0))); + testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, Row.of("Bob"), + Arrays.asList(CUSTOMER_RECORDS.get(1))); + testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, Row.of("Trudy"), + Arrays.asList(CUSTOMER_RECORDS.get(2))); List descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC"); @@ -715,7 +722,7 @@ public void testCreateTableWithColumnSpecificationHierarchy() { testTables.locationForCreateTableSQL(identifier)); // Check the Iceberg table data - org.apache.iceberg.Table icebergTable = loadTable(identifier); + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); Assert.assertEquals(COMPLEX_SCHEMA.asStruct(), icebergTable.schema().asStruct()); } @@ -743,7 +750,7 @@ public void testCreateTableWithAllSupportedTypes() { testTables.locationForCreateTableSQL(identifier)); // Check the Iceberg table data - org.apache.iceberg.Table icebergTable = loadTable(identifier); + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); Assert.assertEquals(allSupportedSchema.asStruct(), icebergTable.schema().asStruct()); } @@ -765,31 +772,11 @@ public void testCreateTableWithNotSupportedTypes() { } } - protected void createTable(String tableName, Schema schema, List records) - throws IOException { - Table table = createIcebergTable(tableName, schema, records); - createHiveTable(tableName, table.location()); - } - - protected Table createIcebergTable(String tableName, Schema schema, List records) - throws IOException { - String identifier = testTables.identifier("default." + tableName); - TestHelper helper = new TestHelper( - shell.metastore().hiveConf(), testTables.tables(), identifier, schema, SPEC, fileFormat, temp); - Table table = helper.createTable(); - - if (!records.isEmpty()) { - helper.appendToTable(helper.writeFile(null, records)); - } - - return table; - } - @Test public void testArrayOfPrimitivesInTable() throws IOException { Schema schema = new Schema(required(1, "arrayofprimitives", Types.ListType.ofRequired(2, Types.IntegerType.get()))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "arraytable"); + List records = testTables.createTableWithGeneratedRecords(shell, "arraytable", schema, fileFormat, 1); // access a single element from the array for (int i = 0; i < records.size(); i++) { List expectedList = (List) records.get(i).getField("arrayofprimitives"); @@ -807,7 +794,7 @@ public void testArrayOfArraysInTable() throws IOException { new Schema( required(1, "arrayofarrays", Types.ListType.ofRequired(2, Types.ListType.ofRequired(3, Types.DateType.get())))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "arraytable"); + List records = testTables.createTableWithGeneratedRecords(shell, "arraytable", schema, fileFormat, 1); // access an element from a matrix for (int i = 0; i < records.size(); i++) { List expectedList = (List) records.get(i).getField("arrayofarrays"); @@ -829,7 +816,7 @@ public void testArrayOfMapsInTable() throws IOException { new Schema(required(1, "arrayofmaps", Types.ListType .ofRequired(2, Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.BooleanType.get())))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "arraytable"); + List records = testTables.createTableWithGeneratedRecords(shell, "arraytable", schema, fileFormat, 1); // access an element from a map in an array for (int i = 0; i < records.size(); i++) { List expectedList = (List) records.get(i).getField("arrayofmaps"); @@ -852,7 +839,7 @@ public void testArrayOfStructsInTable() throws IOException { required(1, "arrayofstructs", Types.ListType.ofRequired(2, Types.StructType .of(required(3, "something", Types.DoubleType.get()), required(4, "someone", Types.LongType.get()), required(5, "somewhere", Types.StringType.get()))))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "arraytable"); + List records = testTables.createTableWithGeneratedRecords(shell, "arraytable", schema, fileFormat, 1); // access an element from a struct in an array for (int i = 0; i < records.size(); i++) { List expectedList = (List) records.get(i).getField("arrayofstructs"); @@ -873,7 +860,7 @@ public void testMapOfPrimitivesInTable() throws IOException { Schema schema = new Schema( required(1, "mapofprimitives", Types.MapType.ofRequired(2, 3, Types.StringType.get(), Types.IntegerType.get()))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "maptable"); + List records = testTables.createTableWithGeneratedRecords(shell, "maptable", schema, fileFormat, 1); // access a single value from the map for (int i = 0; i < records.size(); i++) { Map expectedMap = (Map) records.get(i).getField("mapofprimitives"); @@ -892,7 +879,7 @@ public void testMapOfArraysInTable() throws IOException { required(1, "mapofarrays", Types.MapType.ofRequired(2, 3, Types.StringType.get(), Types.ListType.ofRequired(4, Types.DateType.get())))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "maptable"); + List records = testTables.createTableWithGeneratedRecords(shell, "maptable", schema, fileFormat, 1); // access a single element from a list in a map for (int i = 0; i < records.size(); i++) { Map expectedMap = (Map) records.get(i).getField("mapofarrays"); @@ -912,7 +899,7 @@ public void testMapOfMapsInTable() throws IOException { Schema schema = new Schema( required(1, "mapofmaps", Types.MapType.ofRequired(2, 3, Types.StringType.get(), Types.MapType.ofRequired(4, 5, Types.StringType.get(), Types.StringType.get())))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "maptable"); + List records = testTables.createTableWithGeneratedRecords(shell, "maptable", schema, fileFormat, 1); // access a single element from a map in a map for (int i = 0; i < records.size(); i++) { Map expectedMap = (Map) records.get(i).getField("mapofmaps"); @@ -935,7 +922,7 @@ public void testMapOfStructsInTable() throws IOException { Types.StructType.of(required(4, "something", Types.DoubleType.get()), required(5, "someone", Types.LongType.get()), required(6, "somewhere", Types.StringType.get()))))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "maptable"); + List records = testTables.createTableWithGeneratedRecords(shell, "maptable", schema, fileFormat, 1); // access a single element from a struct in a map for (int i = 0; i < records.size(); i++) { Map expectedMap = (Map) records.get(i).getField("mapofstructs"); @@ -956,7 +943,7 @@ public void testStructOfPrimitivesInTable() throws IOException { Schema schema = new Schema(required(1, "structofprimitives", Types.StructType.of(required(2, "key", Types.StringType.get()), required(3, "value", Types.IntegerType.get())))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "structtable"); + List records = testTables.createTableWithGeneratedRecords(shell, "structtable", schema, fileFormat, 1); // access a single value in a struct for (int i = 0; i < records.size(); i++) { GenericRecord expectedStruct = (GenericRecord) records.get(i).getField("structofprimitives"); @@ -974,7 +961,7 @@ public void testStructOfArraysInTable() throws IOException { .of(required(2, "names", Types.ListType.ofRequired(3, Types.StringType.get())), required(4, "birthdays", Types.ListType.ofRequired(5, Types.DateType.get()))))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "structtable"); + List records = testTables.createTableWithGeneratedRecords(shell, "structtable", schema, fileFormat, 1); // access an element of an array inside a struct for (int i = 0; i < records.size(); i++) { GenericRecord expectedStruct = (GenericRecord) records.get(i).getField("structofarrays"); @@ -1001,7 +988,7 @@ public void testStructOfMapsInTable() throws IOException { Types.StringType.get(), Types.StringType.get())), required(5, "map2", Types.MapType.ofRequired(6, 7, Types.StringType.get(), Types.IntegerType.get()))))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "structtable"); + List records = testTables.createTableWithGeneratedRecords(shell, "structtable", schema, fileFormat, 1); // access a map entry inside a struct for (int i = 0; i < records.size(); i++) { GenericRecord expectedStruct = (GenericRecord) records.get(i).getField("structofmaps"); @@ -1028,7 +1015,7 @@ public void testStructOfStructsInTable() throws IOException { required(1, "structofstructs", Types.StructType.of(required(2, "struct1", Types.StructType .of(required(3, "key", Types.StringType.get()), required(4, "value", Types.IntegerType.get())))))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "structtable"); + List records = testTables.createTableWithGeneratedRecords(shell, "structtable", schema, fileFormat, 1); // access a struct element inside a struct for (int i = 0; i < records.size(); i++) { GenericRecord expectedStruct = (GenericRecord) records.get(i).getField("structofstructs"); @@ -1040,50 +1027,4 @@ public void testStructOfStructsInTable() throws IOException { Assert.assertEquals(expectedInnerStruct.getField("value"), queryResult.get(0)[1]); } } - - protected void appendIcebergTable(Table table, FileFormat format, StructLike partition, List records) - throws IOException { - TestHelper helper = new TestHelper( - shell.getHiveConf(), null, null, null, null, format, temp); - - helper.setTable(table); - if (!records.isEmpty()) { - helper.appendToTable(helper.writeFile(partition, records)); - } - } - - protected void createHiveTable(String tableName, String location) { - shell.executeStatement(String.format( - "CREATE TABLE default.%s " + - "STORED BY '%s' " + - "LOCATION '%s'", - tableName, HiveIcebergStorageHandler.class.getName(), location)); - } - - private Table loadTable(TableIdentifier identifier) { - Properties properties = new Properties(); - properties.put(Catalogs.NAME, identifier.toString()); - String expectedLocation = testTables.loadLocation(identifier); - if (expectedLocation != null) { - properties.put(Catalogs.LOCATION, expectedLocation); - } - - // Check the Iceberg table data - return Catalogs.loadTable(shell.getHiveConf(), properties); - } - - protected String locationForCreateTableSQL(TemporaryFolder root, String tableName) { - return "LOCATION '" + root.getRoot().getPath() + "/hadoop/warehouse/default/" + tableName + "' "; - } - - protected String loadLocation(TemporaryFolder root, String tableName) { - return null; - } - - private List createTableWithGeneratedRecords(Schema schema, int numRecords, long seed, String tableName) - throws IOException { - List records = TestHelper.generateRandomRecords(schema, numRecords, seed); - createTable(tableName, schema, records); - return records; - } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithCustomCatalog.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithCustomCatalog.java deleted file mode 100644 index b0f78bbbe8fd..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithCustomCatalog.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.junit.rules.TemporaryFolder; - -public class TestHiveIcebergStorageHandlerWithCustomCatalog extends HiveIcebergStorageHandlerBaseTest { - - @Override - public TestTables testTables(Configuration conf, TemporaryFolder temp) throws IOException { - return new TestTables.CustomCatalogTestTables(conf, temp); - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopCatalog.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopCatalog.java deleted file mode 100644 index 22ade0f2dc5c..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopCatalog.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.junit.rules.TemporaryFolder; - -public class TestHiveIcebergStorageHandlerWithHadoopCatalog extends HiveIcebergStorageHandlerBaseTest { - - @Override - public TestTables testTables(Configuration conf, TemporaryFolder temp) throws IOException { - return new TestTables.HadoopCatalogTestTables(conf, temp); - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java deleted file mode 100644 index 9781dbefbe61..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import org.apache.hadoop.conf.Configuration; -import org.junit.rules.TemporaryFolder; - -public class TestHiveIcebergStorageHandlerWithHadoopTables extends HiveIcebergStorageHandlerBaseTest { - - @Override - public TestTables testTables(Configuration conf, TemporaryFolder temp) { - return new TestTables.HadoopTestTables(conf, temp); - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java deleted file mode 100644 index 22e34744a1fe..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import org.apache.hadoop.conf.Configuration; -import org.junit.rules.TemporaryFolder; - -public class TestHiveIcebergStorageHandlerWithHiveCatalog extends HiveIcebergStorageHandlerBaseTest { - - @Override - public TestTables testTables(Configuration conf, TemporaryFolder temp) { - return new TestTables.HiveTestTables(conf, temp); - } - - @Override - protected void createHiveTable(String tableName, String location) { - // The Hive catalog has already created the Hive table so there's no need to issue another - // 'CREATE TABLE ...' statement. - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java index 5fcab6d36b95..94efe42fb702 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java @@ -23,24 +23,28 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Collections; +import java.util.List; import java.util.Map; -import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.Tables; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hive.HiveCatalogs; import org.apache.iceberg.hive.MetastoreUtil; -import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.TestCatalogs; +import org.apache.iceberg.mr.TestHelper; import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays; import org.junit.Assert; @@ -48,6 +52,12 @@ // Helper class for setting up and testing various catalog implementations abstract class TestTables { + public static final TestTableType[] ALL_TABLE_TYPES = new TestTableType[] { + TestTableType.HADOOP_TABLE, + TestTableType.HADOOP_CATALOG, + TestTableType.CUSTOM_CATALOG, + TestTableType.HIVE_CATALOG + }; private final Tables tables; protected final TemporaryFolder temp; @@ -83,12 +93,111 @@ public Tables tables() { public abstract String locationForCreateTableSQL(TableIdentifier identifier); /** - * If the {@link Catalogs#LOCATION} is needed for {@link Catalogs#loadTable(Configuration, Properties)} then this - * method should provide the location string. It should return null if the location is not needed. - * @param identifier The table identifier - * @return The location string for loadTable operation + * If an independent Hive table creation is needed for the given Catalog then this should return the Hive SQL + * string which we have to execute. Overridden for HiveCatalog where the Hive table is immediately created + * during the Iceberg table creation so no extra sql execution is required. + * @param identifier The table identifier (the namespace should be non-empty and single level) + * @return The SQL string - which should be executed, null - if it is not needed. + */ + public String createHiveTableSQL(TableIdentifier identifier) { + Preconditions.checkArgument(!identifier.namespace().isEmpty(), "Namespace should not be empty"); + Preconditions.checkArgument(identifier.namespace().levels().length == 1, "Namespace should be single level"); + return String.format("CREATE TABLE %s.%s STORED BY '%s' %s", identifier.namespace(), identifier.name(), + HiveIcebergStorageHandler.class.getName(), locationForCreateTableSQL(identifier)); + } + + /** + * Loads the given table from the actual catalog. Overridden by HadoopTables, since the parameter of the + * {@link Tables#load(String)} should be the full path of the table metadata directory + * @param identifier The table we want to load + * @return The Table loaded from the Catalog */ - public abstract String loadLocation(TableIdentifier identifier); + public Table loadTable(TableIdentifier identifier) { + return tables.load(identifier.toString()); + } + + /** + * Creates a Hive test table. Creates the Iceberg table/data and creates the corresponding Hive table as well when + * needed. The table will be in the 'default' database. The table will be populated with the provided List of + * {@link Record}s. + * @param shell The HiveShell used for Hive table creation + * @param tableName The name of the test table + * @param schema The schema used for the table creation + * @param fileFormat The file format used for writing the data + * @param records The records with which the table is populated + * @throws IOException If there is an error writing data + */ + public void createTable(TestHiveShell shell, String tableName, Schema schema, FileFormat fileFormat, + List records) throws IOException { + createIcebergTable(shell.getHiveConf(), tableName, schema, fileFormat, records); + String createHiveSQL = createHiveTableSQL(TableIdentifier.of("default", tableName)); + if (createHiveSQL != null) { + shell.executeStatement(createHiveSQL); + } + } + + /** + * Creates a Hive test table. Creates the Iceberg table/data and creates the corresponding Hive table as well when + * needed. The table will be in the 'default' database. The table will be populated with the provided with randomly + * generated {@link Record}s. + * @param shell The HiveShell used for Hive table creation + * @param tableName The name of the test table + * @param schema The schema used for the table creation + * @param fileFormat The file format used for writing the data + * @param numRecords The number of records should be generated and stored in the table + * @throws IOException If there is an error writing data + */ + public List createTableWithGeneratedRecords(TestHiveShell shell, String tableName, Schema schema, + FileFormat fileFormat, int numRecords) throws IOException { + List records = TestHelper.generateRandomRecords(schema, numRecords, 0L); + createTable(shell, tableName, schema, fileFormat, records); + return records; + } + + /** + * Creates an Iceberg table/data without creating the corresponding Hive table. The table will be in the 'default' + * namespace. + * @param configuration The configuration used during the table creation + * @param tableName The name of the test table + * @param schema The schema used for the table creation + * @param fileFormat The file format used for writing the data + * @param records The records with which the table is populated + * @return The create table + * @throws IOException If there is an error writing data + */ + public Table createIcebergTable(Configuration configuration, String tableName, Schema schema, FileFormat fileFormat, + List records) throws IOException { + String identifier = identifier("default." + tableName); + TestHelper helper = new TestHelper(new Configuration(configuration), tables(), identifier, schema, + PartitionSpec.unpartitioned(), fileFormat, temp); + Table table = helper.createTable(); + + if (records != null && !records.isEmpty()) { + helper.appendToTable(helper.writeFile(null, records)); + } + + return table; + } + + /** + * Append more data to the table. + * @param configuration The configuration used during the table creation + * @param table The table to append + * @param format The file format used for writing the data + * @param partition The partition to write to + * @param records The records with which should be added to the table + * @throws IOException If there is an error writing data + */ + public void appendIcebergTable(Configuration configuration, Table table, FileFormat format, StructLike partition, + List records) throws IOException { + TestHelper helper = new TestHelper( + configuration, null, null, null, null, format, temp); + + helper.setTable(table); + if (!records.isEmpty()) { + helper.appendToTable(helper.writeFile(partition, records)); + } + } private static class CatalogToTables implements Tables { @@ -142,10 +251,6 @@ public String locationForCreateTableSQL(TableIdentifier identifier) { return "LOCATION '" + warehouseLocation + TestTables.tablePath(identifier) + "' "; } - @Override - public String loadLocation(TableIdentifier identifier) { - return warehouseLocation + TestTables.tablePath(identifier); - } } static class HadoopCatalogTestTables extends TestTables { @@ -173,14 +278,9 @@ public Map properties() { public String locationForCreateTableSQL(TableIdentifier identifier) { return "LOCATION '" + warehouseLocation + TestTables.tablePath(identifier) + "' "; } - - public String loadLocation(TableIdentifier identifier) { - return null; - } } static class HadoopTestTables extends TestTables { - HadoopTestTables(Configuration conf, TemporaryFolder temp) { super(new HadoopTables(conf), temp); } @@ -206,9 +306,10 @@ public String locationForCreateTableSQL(TableIdentifier identifier) { } @Override - public String loadLocation(TableIdentifier identifier) { - return temp.getRoot().getPath() + TestTables.tablePath(identifier); + public Table loadTable(TableIdentifier identifier) { + return tables().load(temp.getRoot().getPath() + TestTables.tablePath(identifier)); } + } static class HiveTestTables extends TestTables { @@ -227,7 +328,8 @@ public String locationForCreateTableSQL(TableIdentifier identifier) { return ""; } - public String loadLocation(TableIdentifier identifier) { + @Override + public String createHiveTableSQL(TableIdentifier identifier) { return null; } } @@ -235,4 +337,29 @@ public String loadLocation(TableIdentifier identifier) { private static String tablePath(TableIdentifier identifier) { return "/" + Joiner.on("/").join(identifier.namespace().levels()) + "/" + identifier.name(); } + + enum TestTableType { + HADOOP_TABLE { + public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) { + return new HadoopTestTables(conf, temporaryFolder); + } + }, + HADOOP_CATALOG { + public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException { + return new HadoopCatalogTestTables(conf, temporaryFolder); + } + }, + CUSTOM_CATALOG { + public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException { + return new CustomCatalogTestTables(conf, temporaryFolder); + } + }, + HIVE_CATALOG { + public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) { + return new HiveTestTables(conf, temporaryFolder); + } + }; + + public abstract TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException; + } }