diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java deleted file mode 100644 index 1c92f7841115..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java +++ /dev/null @@ -1,1036 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import java.io.IOException; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.stream.Collectors; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.iceberg.AssertHelpers; -import org.apache.iceberg.BaseMetastoreTableOperations; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PartitionSpecParser; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.TestHelpers.Row; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.hadoop.Util; -import org.apache.iceberg.mr.Catalogs; -import org.apache.iceberg.mr.InputFormatConfig; -import org.apache.iceberg.mr.TestHelper; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; -import org.apache.thrift.TException; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.junit.runners.Parameterized.Parameter; -import static org.junit.runners.Parameterized.Parameters; - -@RunWith(Parameterized.class) -public abstract class HiveIcebergStorageHandlerBaseTest { - - @Rule - public TemporaryFolder temp = new TemporaryFolder(); - - private static final Schema CUSTOMER_SCHEMA = new Schema( - optional(1, "customer_id", Types.LongType.get()), - optional(2, "first_name", Types.StringType.get()) - ); - - private static final List CUSTOMER_RECORDS = TestHelper.RecordsBuilder.newInstance(CUSTOMER_SCHEMA) - .add(0L, "Alice") - .add(1L, "Bob") - .add(2L, "Trudy") - .build(); - - private static final Schema ORDER_SCHEMA = new Schema( - required(1, "order_id", Types.LongType.get()), - required(2, "customer_id", Types.LongType.get()), - required(3, "total", Types.DoubleType.get())); - - private static final List ORDER_RECORDS = TestHelper.RecordsBuilder.newInstance(ORDER_SCHEMA) - .add(100L, 0L, 11.11d) - .add(101L, 0L, 22.22d) - .add(102L, 1L, 33.33d) - .build(); - - private static final PartitionSpec SPEC = PartitionSpec.unpartitioned(); - - private static final PartitionSpec IDENTITY_SPEC = - PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("customer_id").build(); - - private static final Schema COMPLEX_SCHEMA = new Schema( - optional(1, "id", Types.LongType.get()), - optional(2, "name", Types.StringType.get()), - optional(3, "employee_info", Types.StructType.of( - optional(7, "employer", Types.StringType.get()), - optional(8, "id", Types.LongType.get()), - optional(9, "address", Types.StringType.get()) - )), - optional(4, "places_lived", Types.ListType.ofOptional(10, Types.StructType.of( - optional(11, "street", Types.StringType.get()), - optional(12, "city", Types.StringType.get()), - optional(13, "country", Types.StringType.get()) - ))), - optional(5, "memorable_moments", Types.MapType.ofOptional(14, 15, - Types.StringType.get(), - Types.StructType.of( - optional(16, "year", Types.IntegerType.get()), - optional(17, "place", Types.StringType.get()), - optional(18, "details", Types.StringType.get()) - ))), - optional(6, "current_address", Types.StructType.of( - optional(19, "street_address", Types.StructType.of( - optional(22, "street_number", Types.IntegerType.get()), - optional(23, "street_name", Types.StringType.get()), - optional(24, "street_type", Types.StringType.get()) - )), - optional(20, "country", Types.StringType.get()), - optional(21, "postal_code", Types.StringType.get()) - )) - ); - - private static final Set IGNORED_PARAMS = - ImmutableSet.of("bucketing_version", StatsSetupConst.ROW_COUNT, - StatsSetupConst.RAW_DATA_SIZE, StatsSetupConst.TOTAL_SIZE, StatsSetupConst.NUM_FILES, "numFilesErasureCoded"); - - private static TestHiveShell shell; - - private static final List SUPPORTED_TYPES = - ImmutableList.of(Types.BooleanType.get(), Types.IntegerType.get(), Types.LongType.get(), - Types.FloatType.get(), Types.DoubleType.get(), Types.DateType.get(), Types.TimestampType.withZone(), - Types.TimestampType.withoutZone(), Types.StringType.get(), Types.BinaryType.get(), - Types.DecimalType.of(3, 1)); - - private TestTables testTables; - - public abstract TestTables testTables(Configuration conf, TemporaryFolder tmp) throws IOException; - - @Parameters(name = "fileFormat={0}, engine={1}") - public static Collection parameters() { - Collection testParams = new ArrayList<>(); - testParams.add(new Object[] { FileFormat.PARQUET, "mr" }); - testParams.add(new Object[] { FileFormat.ORC, "mr" }); - testParams.add(new Object[] { FileFormat.AVRO, "mr" }); - - // include Tez tests only for Java 8 - String javaVersion = System.getProperty("java.specification.version"); - if (javaVersion.equals("1.8")) { - testParams.add(new Object[] { FileFormat.PARQUET, "tez" }); - testParams.add(new Object[] { FileFormat.ORC, "tez" }); - testParams.add(new Object[] { FileFormat.AVRO, "tez" }); - } - return testParams; - } - - @Parameter(0) - public FileFormat fileFormat; - - @Parameter(1) - public String executionEngine; - - @BeforeClass - public static void beforeClass() { - shell = new TestHiveShell(); - shell.setHiveConfValue("hive.notification.event.poll.interval", "-1"); - shell.setHiveConfValue("hive.tez.exec.print.summary", "true"); - shell.start(); - } - - @AfterClass - public static void afterClass() { - shell.stop(); - } - - @Before - public void before() throws IOException { - shell.openSession(); - testTables = testTables(shell.metastore().hiveConf(), temp); - for (Map.Entry property : testTables.properties().entrySet()) { - shell.setHiveSessionValue(property.getKey(), property.getValue()); - } - shell.setHiveSessionValue("hive.execution.engine", executionEngine); - shell.setHiveSessionValue("hive.jar.directory", temp.getRoot().getAbsolutePath()); - shell.setHiveSessionValue("tez.staging-dir", temp.getRoot().getAbsolutePath()); - } - - @After - public void after() throws Exception { - shell.closeSession(); - shell.metastore().reset(); - // HiveServer2 thread pools are using thread local Hive -> HMSClient objects. These are not cleaned up when the - // HiveServer2 is stopped. Only Finalizer closes the HMS connections. - System.gc(); - } - - @Test - public void testScanEmptyTable() throws IOException { - Schema emptySchema = new Schema(required(1, "empty", Types.StringType.get())); - createTable("empty", emptySchema, ImmutableList.of()); - - List rows = shell.executeStatement("SELECT * FROM default.empty"); - Assert.assertEquals(0, rows.size()); - } - - @Test - public void testScanTable() throws IOException { - createTable("customers", CUSTOMER_SCHEMA, CUSTOMER_RECORDS); - - // Single fetch task: no MR job. - List rows = shell.executeStatement("SELECT * FROM default.customers"); - - Assert.assertEquals(3, rows.size()); - Assert.assertArrayEquals(new Object[] {0L, "Alice"}, rows.get(0)); - Assert.assertArrayEquals(new Object[] {1L, "Bob"}, rows.get(1)); - Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, rows.get(2)); - - // Adding the ORDER BY clause will cause Hive to spawn a local MR job this time. - List descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC"); - - Assert.assertEquals(3, descRows.size()); - Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, descRows.get(0)); - Assert.assertArrayEquals(new Object[] {1L, "Bob"}, descRows.get(1)); - Assert.assertArrayEquals(new Object[] {0L, "Alice"}, descRows.get(2)); - } - - @Test - public void testJoinTables() throws IOException { - createTable("customers", CUSTOMER_SCHEMA, CUSTOMER_RECORDS); - createTable("orders", ORDER_SCHEMA, ORDER_RECORDS); - - List rows = shell.executeStatement( - "SELECT c.customer_id, c.first_name, o.order_id, o.total " + - "FROM default.customers c JOIN default.orders o ON c.customer_id = o.customer_id " + - "ORDER BY c.customer_id, o.order_id" - ); - - Assert.assertArrayEquals(new Object[] {0L, "Alice", 100L, 11.11d}, rows.get(0)); - Assert.assertArrayEquals(new Object[] {0L, "Alice", 101L, 22.22d}, rows.get(1)); - Assert.assertArrayEquals(new Object[] {1L, "Bob", 102L, 33.33d}, rows.get(2)); - } - - @Test - public void testDecimalTableWithPredicateLiterals() throws IOException { - Schema schema = new Schema(required(1, "decimal_field", Types.DecimalType.of(7, 2))); - List records = TestHelper.RecordsBuilder.newInstance(schema) - .add(new BigDecimal("85.00")) - .add(new BigDecimal("100.56")) - .add(new BigDecimal("100.57")) - .build(); - createTable("dec_test", schema, records); - - // Use integer literal in predicate - List rows = shell.executeStatement("SELECT * FROM default.dec_test where decimal_field >= 85"); - Assert.assertEquals(3, rows.size()); - Assert.assertArrayEquals(new Object[] {"85.00"}, rows.get(0)); - Assert.assertArrayEquals(new Object[] {"100.56"}, rows.get(1)); - Assert.assertArrayEquals(new Object[] {"100.57"}, rows.get(2)); - - // Use decimal literal in predicate with smaller scale than schema type definition - rows = shell.executeStatement("SELECT * FROM default.dec_test where decimal_field > 99.1"); - Assert.assertEquals(2, rows.size()); - Assert.assertArrayEquals(new Object[] {"100.56"}, rows.get(0)); - Assert.assertArrayEquals(new Object[] {"100.57"}, rows.get(1)); - - // Use decimal literal in predicate with higher scale than schema type definition - rows = shell.executeStatement("SELECT * FROM default.dec_test where decimal_field > 100.565"); - Assert.assertEquals(1, rows.size()); - Assert.assertArrayEquals(new Object[] {"100.57"}, rows.get(0)); - - // Use decimal literal in predicate with the same scale as schema type definition - rows = shell.executeStatement("SELECT * FROM default.dec_test where decimal_field > 640.34"); - Assert.assertEquals(0, rows.size()); - } - - @Test - public void testJoinTablesSupportedTypes() throws IOException { - for (int i = 0; i < SUPPORTED_TYPES.size(); i++) { - Type type = SUPPORTED_TYPES.get(i); - String tableName = type.typeId().toString().toLowerCase() + "_table_" + i; - String columnName = type.typeId().toString().toLowerCase() + "_column"; - - Schema schema = new Schema(required(1, columnName, type)); - List records = TestHelper.generateRandomRecords(schema, 1, 0L); - - createTable(tableName, schema, records); - List queryResult = shell.executeStatement("select s." + columnName + ", h." + columnName + - " from default." + tableName + " s join default." + tableName + " h on h." + columnName + "=s." + - columnName); - Assert.assertEquals("Non matching record count for table " + tableName + " with type " + type, - 1, queryResult.size()); - } - } - - @Test - public void testSelectDistinctFromTable() throws IOException { - for (int i = 0; i < SUPPORTED_TYPES.size(); i++) { - Type type = SUPPORTED_TYPES.get(i); - String tableName = type.typeId().toString().toLowerCase() + "_table_" + i; - String columnName = type.typeId().toString().toLowerCase() + "_column"; - - Schema schema = new Schema(required(1, columnName, type)); - List records = TestHelper.generateRandomRecords(schema, 4, 0L); - int size = records.stream().map(r -> r.getField(columnName)).collect(Collectors.toSet()).size(); - createTable(tableName, schema, records); - List queryResult = shell.executeStatement("select count(distinct(" + columnName + - ")) from default." + tableName); - int distincIds = ((Long) queryResult.get(0)[0]).intValue(); - Assert.assertEquals(tableName, size, distincIds); - } - } - - @Test - public void testCreateDropTable() throws TException, IOException, InterruptedException { - TableIdentifier identifier = TableIdentifier.of("default", "customers"); - - shell.executeStatement("CREATE EXTERNAL TABLE customers " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier) + - "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "', " + - "'" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(IDENTITY_SPEC) + "', " + - "'dummy'='test')"); - - // Check the Iceberg table data - org.apache.iceberg.Table icebergTable = loadTable(identifier); - Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); - Assert.assertEquals(IDENTITY_SPEC, icebergTable.spec()); - - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - - Map hmsParams = hmsTable.getParameters(); - IGNORED_PARAMS.forEach(hmsParams::remove); - - // This is only set for HiveCatalog based tables. Check the value, then remove it so the other checks can be general - if (Catalogs.hiveCatalog(shell.getHiveConf())) { - Assert.assertTrue(hmsParams.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) - .startsWith(icebergTable.location())); - hmsParams.remove(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); - } - - // General metadata checks - Assert.assertEquals(6, hmsParams.size()); - Assert.assertEquals("test", hmsParams.get("dummy")); - Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE)); - Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); - Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME)); - Assert.assertEquals(HiveIcebergStorageHandler.class.getName(), - hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE)); - Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(), - hmsTable.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); - - if (!Catalogs.hiveCatalog(shell.getHiveConf())) { - Assert.assertEquals(Collections.singletonMap("dummy", "test"), icebergTable.properties()); - - shell.executeStatement("DROP TABLE customers"); - - // Check if the table was really dropped even from the Catalog - AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, - "Table does not exist", () -> { - loadTable(identifier); - } - ); - } else { - Map expectedIcebergProperties = new HashMap<>(2); - expectedIcebergProperties.put("dummy", "test"); - expectedIcebergProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true"); - Assert.assertEquals(expectedIcebergProperties, icebergTable.properties()); - - // Check the HMS table parameters - hmsTable = shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - Path hmsTableLocation = new Path(hmsTable.getSd().getLocation()); - - // Drop the table - shell.executeStatement("DROP TABLE customers"); - - // Check if we drop an exception when trying to load the table - AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, - "Table does not exist", () -> { - loadTable(identifier); - } - ); - - // Check if the files are removed - FileSystem fs = Util.getFs(hmsTableLocation, shell.getHiveConf()); - if (fs.exists(hmsTableLocation)) { - // if table directory has been deleted, we're good. This is the expected behavior in Hive4. - // if table directory exists, its contents should have been cleaned up, save for an empty metadata dir (Hive3). - Assert.assertEquals(1, fs.listStatus(hmsTableLocation).length); - Assert.assertEquals(0, fs.listStatus(new Path(hmsTableLocation, "metadata")).length); - } - } - } - - @Test - public void testCreateTableWithoutSpec() throws TException, InterruptedException { - TableIdentifier identifier = TableIdentifier.of("default", "customers"); - - shell.executeStatement("CREATE EXTERNAL TABLE customers " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier) + - "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); - - // Check the Iceberg table partition data - org.apache.iceberg.Table icebergTable = loadTable(identifier); - Assert.assertEquals(SPEC, icebergTable.spec()); - - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - - Map hmsParams = hmsTable.getParameters(); - IGNORED_PARAMS.forEach(hmsParams::remove); - - // Just check that the PartitionSpec is not set in the metadata - Assert.assertNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC)); - - if (Catalogs.hiveCatalog(shell.getHiveConf())) { - Assert.assertEquals(6, hmsParams.size()); - } else { - Assert.assertEquals(5, hmsParams.size()); - } - } - - @Test - public void testCreateTableWithUnpartitionedSpec() throws TException, InterruptedException { - TableIdentifier identifier = TableIdentifier.of("default", "customers"); - - // We need the location for HadoopTable based tests only - shell.executeStatement("CREATE EXTERNAL TABLE customers " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier) + - "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "', " + - "'" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(SPEC) + "')"); - - // Check the Iceberg table partition data - org.apache.iceberg.Table icebergTable = loadTable(identifier); - Assert.assertEquals(SPEC, icebergTable.spec()); - - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - - Map hmsParams = hmsTable.getParameters(); - IGNORED_PARAMS.forEach(hmsParams::remove); - - // Just check that the PartitionSpec is not set in the metadata - Assert.assertNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC)); - if (Catalogs.hiveCatalog(shell.getHiveConf())) { - Assert.assertEquals(6, hmsParams.size()); - } else { - Assert.assertEquals(5, hmsParams.size()); - } - } - - @Test - public void testDeleteBackingTable() throws TException, IOException, InterruptedException { - TableIdentifier identifier = TableIdentifier.of("default", "customers"); - - shell.executeStatement("CREATE EXTERNAL TABLE customers " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier) + - "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "', " + - "'" + InputFormatConfig.EXTERNAL_TABLE_PURGE + "'='FALSE')"); - - if (!Catalogs.hiveCatalog(shell.getHiveConf())) { - shell.executeStatement("DROP TABLE customers"); - - // Check if the table remains - loadTable(identifier); - } else { - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - Path hmsTableLocation = new Path(hmsTable.getSd().getLocation()); - - // Drop the table - shell.executeStatement("DROP TABLE customers"); - - // Check if we drop an exception when trying to drop the table - AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, - "Table does not exist", () -> { - loadTable(identifier); - } - ); - - // Check if the files are kept - FileSystem fs = Util.getFs(hmsTableLocation, shell.getHiveConf()); - Assert.assertEquals(1, fs.listStatus(hmsTableLocation).length); - Assert.assertEquals(1, fs.listStatus(new Path(hmsTableLocation, "metadata")).length); - } - } - - @Test - public void testCreateTableError() { - TableIdentifier identifier = TableIdentifier.of("default", "withShell2"); - - // Wrong schema - AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, - "Unrecognized token 'WrongSchema'", () -> { - shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier) + - "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='WrongSchema')"); - } - ); - - // Missing schema, we try to get the schema from the table and fail - AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, - "Please provide ", () -> { - shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier)); - } - ); - - if (!testTables.locationForCreateTableSQL(identifier).isEmpty()) { - // Only test this if the location is required - AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, - "Table location not set", () -> { - shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + - SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); - } - ); - } - } - - @Test - public void testCreateTableAboveExistingTable() throws TException, IOException, InterruptedException { - // Create the Iceberg table - createIcebergTable("customers", COMPLEX_SCHEMA, Collections.emptyList()); - - if (Catalogs.hiveCatalog(shell.getHiveConf())) { - - // In HiveCatalog we just expect an exception since the table is already exists - AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, - "customers already exists", () -> { - shell.executeStatement("CREATE EXTERNAL TABLE customers " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + - SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); - } - ); - } else { - shell.executeStatement("CREATE EXTERNAL TABLE customers " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers"))); - - // Check the HMS table parameters - org.apache.hadoop.hive.metastore.api.Table hmsTable = - shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); - - Map hmsParams = hmsTable.getParameters(); - IGNORED_PARAMS.forEach(hmsParams::remove); - - Assert.assertEquals(4, hmsParams.size()); - Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); - Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME)); - Assert.assertEquals(HiveIcebergStorageHandler.class.getName(), - hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE)); - Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(), - hmsTable.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); - } - } - - @Test - public void testCreateTableWithColumnSpecification() throws IOException { - TableIdentifier identifier = TableIdentifier.of("default", "customers"); - - shell.executeStatement("CREATE EXTERNAL TABLE customers (customer_id BIGINT, first_name STRING) " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier)); - - // Check the Iceberg table data - org.apache.iceberg.Table icebergTable = loadTable(identifier); - Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); - Assert.assertEquals(SPEC, icebergTable.spec()); - - appendIcebergTable(icebergTable, fileFormat, null, CUSTOMER_RECORDS); - - List descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC"); - - Assert.assertEquals(3, descRows.size()); - Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, descRows.get(0)); - Assert.assertArrayEquals(new Object[] {1L, "Bob"}, descRows.get(1)); - Assert.assertArrayEquals(new Object[] {0L, "Alice"}, descRows.get(2)); - } - - @Test - public void testCreateTableWithColumnSpecificationPartitioned() { - AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, - "currently not supported", () -> { - shell.executeStatement("CREATE EXTERNAL TABLE customers (customer_id BIGINT) " + - "PARTITIONED BY (first_name STRING) " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers"))); - } - ); - } - - @Test - public void testCreatePartitionedTable() throws IOException { - TableIdentifier identifier = TableIdentifier.of("default", "customers"); - PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("first_name").build(); - - shell.executeStatement("CREATE EXTERNAL TABLE customers " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier) + - "TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(spec) + "', " + - "'" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); - - org.apache.iceberg.Table icebergTable = loadTable(identifier); - Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); - Assert.assertEquals(spec, icebergTable.spec()); - - appendIcebergTable(icebergTable, fileFormat, Row.of("Alice"), Arrays.asList(CUSTOMER_RECORDS.get(0))); - appendIcebergTable(icebergTable, fileFormat, Row.of("Bob"), Arrays.asList(CUSTOMER_RECORDS.get(1))); - appendIcebergTable(icebergTable, fileFormat, Row.of("Trudy"), Arrays.asList(CUSTOMER_RECORDS.get(2))); - - List descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC"); - - Assert.assertEquals(3, descRows.size()); - Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, descRows.get(0)); - Assert.assertArrayEquals(new Object[] {1L, "Bob"}, descRows.get(1)); - Assert.assertArrayEquals(new Object[] {0L, "Alice"}, descRows.get(2)); - } - - @Test - public void testCreateTableWithColumnSpecificationHierarchy() { - TableIdentifier identifier = TableIdentifier.of("default", "customers"); - - shell.executeStatement("CREATE EXTERNAL TABLE customers (" + - "id BIGINT, name STRING, " + - "employee_info STRUCT < employer: STRING, id: BIGINT, address: STRING >, " + - "places_lived ARRAY < STRUCT >, " + - "memorable_moments MAP < STRING, STRUCT < year: INT, place: STRING, details: STRING >>, " + - "current_address STRUCT < street_address: STRUCT " + - ", country: STRING, postal_code: STRING >) " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier)); - - // Check the Iceberg table data - org.apache.iceberg.Table icebergTable = loadTable(identifier); - Assert.assertEquals(COMPLEX_SCHEMA.asStruct(), icebergTable.schema().asStruct()); - } - - @Test - public void testCreateTableWithAllSupportedTypes() { - TableIdentifier identifier = TableIdentifier.of("default", "all_types"); - Schema allSupportedSchema = new Schema( - optional(1, "t_float", Types.FloatType.get()), - optional(2, "t_double", Types.DoubleType.get()), - optional(3, "t_boolean", Types.BooleanType.get()), - optional(4, "t_int", Types.IntegerType.get()), - optional(5, "t_bigint", Types.LongType.get()), - optional(6, "t_binary", Types.BinaryType.get()), - optional(7, "t_string", Types.StringType.get()), - optional(8, "t_timestamp", Types.TimestampType.withoutZone()), - optional(9, "t_date", Types.DateType.get()), - optional(10, "t_decimal", Types.DecimalType.of(3, 2)) - ); - - // Intentionally adding some mixed letters to test that we handle them correctly - shell.executeStatement("CREATE EXTERNAL TABLE all_types (" + - "t_Float FLOaT, t_dOuble DOUBLE, t_boolean BOOLEAN, t_int INT, t_bigint BIGINT, t_binary BINARY, " + - "t_string STRING, t_timestamp TIMESTAMP, t_date DATE, t_decimal DECIMAL(3,2)) " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier)); - - // Check the Iceberg table data - org.apache.iceberg.Table icebergTable = loadTable(identifier); - Assert.assertEquals(allSupportedSchema.asStruct(), icebergTable.schema().asStruct()); - } - - @Test - public void testCreateTableWithNotSupportedTypes() { - TableIdentifier identifier = TableIdentifier.of("default", "not_supported_types"); - // Can not create INTERVAL types from normal create table, so leave them out from this test - String[] notSupportedTypes = new String[] { "TINYINT", "SMALLINT", "VARCHAR(1)", "CHAR(1)" }; - - for (String notSupportedType : notSupportedTypes) { - AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, - "Unsupported Hive type", () -> { - shell.executeStatement("CREATE EXTERNAL TABLE not_supported_types " + - "(not_supported " + notSupportedType + ") " + - "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - testTables.locationForCreateTableSQL(identifier)); - } - ); - } - } - - protected void createTable(String tableName, Schema schema, List records) - throws IOException { - Table table = createIcebergTable(tableName, schema, records); - createHiveTable(tableName, table.location()); - } - - protected Table createIcebergTable(String tableName, Schema schema, List records) - throws IOException { - String identifier = testTables.identifier("default." + tableName); - TestHelper helper = new TestHelper( - shell.metastore().hiveConf(), testTables.tables(), identifier, schema, SPEC, fileFormat, temp); - Table table = helper.createTable(); - - if (!records.isEmpty()) { - helper.appendToTable(helper.writeFile(null, records)); - } - - return table; - } - - @Test - public void testArrayOfPrimitivesInTable() throws IOException { - Schema schema = - new Schema(required(1, "arrayofprimitives", Types.ListType.ofRequired(2, Types.IntegerType.get()))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "arraytable"); - // access a single element from the array - for (int i = 0; i < records.size(); i++) { - List expectedList = (List) records.get(i).getField("arrayofprimitives"); - for (int j = 0; j < expectedList.size(); j++) { - List queryResult = shell.executeStatement( - String.format("SELECT arrayofprimitives[%d] FROM default.arraytable " + "LIMIT 1 OFFSET %d", j, i)); - Assert.assertEquals(expectedList.get(j), queryResult.get(0)[0]); - } - } - } - - @Test - public void testArrayOfArraysInTable() throws IOException { - Schema schema = - new Schema( - required(1, "arrayofarrays", - Types.ListType.ofRequired(2, Types.ListType.ofRequired(3, Types.DateType.get())))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "arraytable"); - // access an element from a matrix - for (int i = 0; i < records.size(); i++) { - List expectedList = (List) records.get(i).getField("arrayofarrays"); - for (int j = 0; j < expectedList.size(); j++) { - List expectedInnerList = (List) expectedList.get(j); - for (int k = 0; k < expectedInnerList.size(); k++) { - List queryResult = shell.executeStatement( - String.format("SELECT arrayofarrays[%d][%d] FROM default.arraytable " + "LIMIT 1 OFFSET %d", - j, k, i)); - Assert.assertEquals(expectedInnerList.get(k).toString(), queryResult.get(0)[0]); - } - } - } - } - - @Test - public void testArrayOfMapsInTable() throws IOException { - Schema schema = - new Schema(required(1, "arrayofmaps", Types.ListType - .ofRequired(2, Types.MapType.ofRequired(3, 4, Types.StringType.get(), - Types.BooleanType.get())))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "arraytable"); - // access an element from a map in an array - for (int i = 0; i < records.size(); i++) { - List expectedList = (List) records.get(i).getField("arrayofmaps"); - for (int j = 0; j < expectedList.size(); j++) { - Map expectedMap = (Map) expectedList.get(j); - for (Map.Entry entry : expectedMap.entrySet()) { - List queryResult = shell.executeStatement(String - .format("SELECT arrayofmaps[%d][\"%s\"] FROM default.arraytable LIMIT 1 OFFSET %d", j, - entry.getKey(), i)); - Assert.assertEquals(entry.getValue(), queryResult.get(0)[0]); - } - } - } - } - - @Test - public void testArrayOfStructsInTable() throws IOException { - Schema schema = - new Schema( - required(1, "arrayofstructs", Types.ListType.ofRequired(2, Types.StructType - .of(required(3, "something", Types.DoubleType.get()), required(4, "someone", - Types.LongType.get()), required(5, "somewhere", Types.StringType.get()))))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "arraytable"); - // access an element from a struct in an array - for (int i = 0; i < records.size(); i++) { - List expectedList = (List) records.get(i).getField("arrayofstructs"); - for (int j = 0; j < expectedList.size(); j++) { - List queryResult = shell.executeStatement(String.format("SELECT arrayofstructs[%d].something, " + - "arrayofstructs[%d].someone, arrayofstructs[%d].somewhere FROM default.arraytable LIMIT 1 " + - "OFFSET %d", j, j, j, i)); - GenericRecord genericRecord = (GenericRecord) expectedList.get(j); - Assert.assertEquals(genericRecord.getField("something"), queryResult.get(0)[0]); - Assert.assertEquals(genericRecord.getField("someone"), queryResult.get(0)[1]); - Assert.assertEquals(genericRecord.getField("somewhere"), queryResult.get(0)[2]); - } - } - } - - @Test - public void testMapOfPrimitivesInTable() throws IOException { - Schema schema = new Schema( - required(1, "mapofprimitives", Types.MapType.ofRequired(2, 3, Types.StringType.get(), - Types.IntegerType.get()))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "maptable"); - // access a single value from the map - for (int i = 0; i < records.size(); i++) { - Map expectedMap = (Map) records.get(i).getField("mapofprimitives"); - for (Map.Entry entry : expectedMap.entrySet()) { - List queryResult = shell.executeStatement(String - .format("SELECT mapofprimitives[\"%s\"] " + "FROM default.maptable LIMIT 1 OFFSET %d", entry.getKey(), - i)); - Assert.assertEquals(entry.getValue(), queryResult.get(0)[0]); - } - } - } - - @Test - public void testMapOfArraysInTable() throws IOException { - Schema schema = new Schema( - required(1, "mapofarrays", - Types.MapType.ofRequired(2, 3, Types.StringType.get(), Types.ListType.ofRequired(4, - Types.DateType.get())))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "maptable"); - // access a single element from a list in a map - for (int i = 0; i < records.size(); i++) { - Map expectedMap = (Map) records.get(i).getField("mapofarrays"); - for (Map.Entry entry : expectedMap.entrySet()) { - List expectedList = (List) entry.getValue(); - for (int j = 0; j < expectedList.size(); j++) { - List queryResult = shell.executeStatement(String - .format("SELECT mapofarrays[\"%s\"]" + "[%d] FROM maptable LIMIT 1 OFFSET %d", entry.getKey(), j, i)); - Assert.assertEquals(expectedList.get(j).toString(), queryResult.get(0)[0]); - } - } - } - } - - @Test - public void testMapOfMapsInTable() throws IOException { - Schema schema = new Schema( - required(1, "mapofmaps", Types.MapType.ofRequired(2, 3, Types.StringType.get(), - Types.MapType.ofRequired(4, 5, Types.StringType.get(), Types.StringType.get())))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "maptable"); - // access a single element from a map in a map - for (int i = 0; i < records.size(); i++) { - Map expectedMap = (Map) records.get(i).getField("mapofmaps"); - for (Map.Entry entry : expectedMap.entrySet()) { - Map expectedInnerMap = (Map) entry.getValue(); - for (Map.Entry innerEntry : expectedInnerMap.entrySet()) { - List queryResult = shell.executeStatement(String - .format("SELECT mapofmaps[\"%s\"]" + "[\"%s\"] FROM maptable LIMIT 1 OFFSET %d", entry.getKey(), - innerEntry.getKey(), i)); - Assert.assertEquals(innerEntry.getValue(), queryResult.get(0)[0]); - } - } - } - } - - @Test - public void testMapOfStructsInTable() throws IOException { - Schema schema = new Schema( - required(1, "mapofstructs", Types.MapType.ofRequired(2, 3, Types.StringType.get(), - Types.StructType.of(required(4, "something", Types.DoubleType.get()), - required(5, "someone", Types.LongType.get()), - required(6, "somewhere", Types.StringType.get()))))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "maptable"); - // access a single element from a struct in a map - for (int i = 0; i < records.size(); i++) { - Map expectedMap = (Map) records.get(i).getField("mapofstructs"); - for (Map.Entry entry : expectedMap.entrySet()) { - List queryResult = shell.executeStatement(String.format("SELECT mapofstructs[\"%s\"].something, " + - "mapofstructs[\"%s\"].someone, mapofstructs[\"%s\"].somewhere FROM default.maptable LIMIT 1 " + - "OFFSET %d", entry.getKey(), entry.getKey(), entry.getKey(), i)); - GenericRecord genericRecord = (GenericRecord) entry.getValue(); - Assert.assertEquals(genericRecord.getField("something"), queryResult.get(0)[0]); - Assert.assertEquals(genericRecord.getField("someone"), queryResult.get(0)[1]); - Assert.assertEquals(genericRecord.getField("somewhere"), queryResult.get(0)[2]); - } - } - } - - @Test - public void testStructOfPrimitivesInTable() throws IOException { - Schema schema = new Schema(required(1, "structofprimitives", - Types.StructType.of(required(2, "key", Types.StringType.get()), required(3, "value", - Types.IntegerType.get())))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "structtable"); - // access a single value in a struct - for (int i = 0; i < records.size(); i++) { - GenericRecord expectedStruct = (GenericRecord) records.get(i).getField("structofprimitives"); - List queryResult = shell.executeStatement(String.format( - "SELECT structofprimitives.key, structofprimitives.value FROM default.structtable LIMIT 1 OFFSET %d", i)); - Assert.assertEquals(expectedStruct.getField("key"), queryResult.get(0)[0]); - Assert.assertEquals(expectedStruct.getField("value"), queryResult.get(0)[1]); - } - } - - @Test - public void testStructOfArraysInTable() throws IOException { - Schema schema = new Schema( - required(1, "structofarrays", Types.StructType - .of(required(2, "names", Types.ListType.ofRequired(3, Types.StringType.get())), - required(4, "birthdays", Types.ListType.ofRequired(5, - Types.DateType.get()))))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "structtable"); - // access an element of an array inside a struct - for (int i = 0; i < records.size(); i++) { - GenericRecord expectedStruct = (GenericRecord) records.get(i).getField("structofarrays"); - List expectedList = (List) expectedStruct.getField("names"); - for (int j = 0; j < expectedList.size(); j++) { - List queryResult = shell.executeStatement( - String.format("SELECT structofarrays.names[%d] FROM default.structtable LIMIT 1 OFFSET %d", j, i)); - Assert.assertEquals(expectedList.get(j), queryResult.get(0)[0]); - } - expectedList = (List) expectedStruct.getField("birthdays"); - for (int j = 0; j < expectedList.size(); j++) { - List queryResult = shell.executeStatement( - String.format("SELECT structofarrays.birthdays[%d] FROM default.structtable LIMIT 1 OFFSET %d", j, i)); - Assert.assertEquals(expectedList.get(j).toString(), queryResult.get(0)[0]); - } - } - } - - @Test - public void testStructOfMapsInTable() throws IOException { - Schema schema = new Schema( - required(1, "structofmaps", Types.StructType - .of(required(2, "map1", Types.MapType.ofRequired(3, 4, - Types.StringType.get(), Types.StringType.get())), required(5, "map2", - Types.MapType.ofRequired(6, 7, Types.StringType.get(), - Types.IntegerType.get()))))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "structtable"); - // access a map entry inside a struct - for (int i = 0; i < records.size(); i++) { - GenericRecord expectedStruct = (GenericRecord) records.get(i).getField("structofmaps"); - Map expectedMap = (Map) expectedStruct.getField("map1"); - for (Map.Entry entry : expectedMap.entrySet()) { - List queryResult = shell.executeStatement(String - .format("SELECT structofmaps.map1[\"%s\"] from default.structtable LIMIT 1 OFFSET %d", entry.getKey(), - i)); - Assert.assertEquals(entry.getValue(), queryResult.get(0)[0]); - } - expectedMap = (Map) expectedStruct.getField("map2"); - for (Map.Entry entry : expectedMap.entrySet()) { - List queryResult = shell.executeStatement(String - .format("SELECT structofmaps.map2[\"%s\"] from default.structtable LIMIT 1 OFFSET %d", entry.getKey(), - i)); - Assert.assertEquals(entry.getValue(), queryResult.get(0)[0]); - } - } - } - - @Test - public void testStructOfStructsInTable() throws IOException { - Schema schema = new Schema( - required(1, "structofstructs", Types.StructType.of(required(2, "struct1", Types.StructType - .of(required(3, "key", Types.StringType.get()), required(4, "value", - Types.IntegerType.get())))))); - List records = createTableWithGeneratedRecords(schema, 1, 0L, "structtable"); - // access a struct element inside a struct - for (int i = 0; i < records.size(); i++) { - GenericRecord expectedStruct = (GenericRecord) records.get(i).getField("structofstructs"); - GenericRecord expectedInnerStruct = (GenericRecord) expectedStruct.getField("struct1"); - List queryResult = shell.executeStatement(String.format( - "SELECT structofstructs.struct1.key, structofstructs.struct1.value FROM default.structtable " + - "LIMIT 1 OFFSET %d", i)); - Assert.assertEquals(expectedInnerStruct.getField("key"), queryResult.get(0)[0]); - Assert.assertEquals(expectedInnerStruct.getField("value"), queryResult.get(0)[1]); - } - } - - protected void appendIcebergTable(Table table, FileFormat format, StructLike partition, List records) - throws IOException { - TestHelper helper = new TestHelper( - shell.getHiveConf(), null, null, null, null, format, temp); - - helper.setTable(table); - if (!records.isEmpty()) { - helper.appendToTable(helper.writeFile(partition, records)); - } - } - - protected void createHiveTable(String tableName, String location) { - shell.executeStatement(String.format( - "CREATE TABLE default.%s " + - "STORED BY '%s' " + - "LOCATION '%s'", - tableName, HiveIcebergStorageHandler.class.getName(), location)); - } - - private Table loadTable(TableIdentifier identifier) { - Properties properties = new Properties(); - properties.put(Catalogs.NAME, identifier.toString()); - String expectedLocation = testTables.loadLocation(identifier); - if (expectedLocation != null) { - properties.put(Catalogs.LOCATION, expectedLocation); - } - - // Check the Iceberg table data - return Catalogs.loadTable(shell.getHiveConf(), properties); - } - - protected String locationForCreateTableSQL(TemporaryFolder root, String tableName) { - return "LOCATION '" + root.getRoot().getPath() + "/hadoop/warehouse/default/" + tableName + "' "; - } - - protected String loadLocation(TemporaryFolder root, String tableName) { - return null; - } - - private List createTableWithGeneratedRecords(Schema schema, int numRecords, long seed, String tableName) - throws IOException { - List records = TestHelper.generateRandomRecords(schema, numRecords, seed); - createTable(tableName, schema, records); - return records; - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithCustomCatalog.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithCustomCatalog.java deleted file mode 100644 index b0f78bbbe8fd..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithCustomCatalog.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.junit.rules.TemporaryFolder; - -public class TestHiveIcebergStorageHandlerWithCustomCatalog extends HiveIcebergStorageHandlerBaseTest { - - @Override - public TestTables testTables(Configuration conf, TemporaryFolder temp) throws IOException { - return new TestTables.CustomCatalogTestTables(conf, temp); - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopCatalog.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopCatalog.java deleted file mode 100644 index 22ade0f2dc5c..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopCatalog.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.junit.rules.TemporaryFolder; - -public class TestHiveIcebergStorageHandlerWithHadoopCatalog extends HiveIcebergStorageHandlerBaseTest { - - @Override - public TestTables testTables(Configuration conf, TemporaryFolder temp) throws IOException { - return new TestTables.HadoopCatalogTestTables(conf, temp); - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java deleted file mode 100644 index 9781dbefbe61..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import org.apache.hadoop.conf.Configuration; -import org.junit.rules.TemporaryFolder; - -public class TestHiveIcebergStorageHandlerWithHadoopTables extends HiveIcebergStorageHandlerBaseTest { - - @Override - public TestTables testTables(Configuration conf, TemporaryFolder temp) { - return new TestTables.HadoopTestTables(conf, temp); - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java deleted file mode 100644 index 22e34744a1fe..000000000000 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import org.apache.hadoop.conf.Configuration; -import org.junit.rules.TemporaryFolder; - -public class TestHiveIcebergStorageHandlerWithHiveCatalog extends HiveIcebergStorageHandlerBaseTest { - - @Override - public TestTables testTables(Configuration conf, TemporaryFolder temp) { - return new TestTables.HiveTestTables(conf, temp); - } - - @Override - protected void createHiveTable(String tableName, String location) { - // The Hive catalog has already created the Hive table so there's no need to issue another - // 'CREATE TABLE ...' statement. - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithLocalMR.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithLocalMR.java new file mode 100644 index 000000000000..56dd20c90cb5 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithLocalMR.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mr.TestHelper; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.junit.runners.Parameterized.Parameter; +import static org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class TestHiveIcebergStorageHandlerWithLocalMR { + private static final FileFormat[] fileFormats = + new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, FileFormat.PARQUET}; + + private static final Schema CUSTOMER_SCHEMA = new Schema( + optional(1, "customer_id", Types.LongType.get()), + optional(2, "first_name", Types.StringType.get()) + ); + + private static final List CUSTOMER_RECORDS = TestHelper.RecordsBuilder.newInstance(CUSTOMER_SCHEMA) + .add(0L, "Alice") + .add(1L, "Bob") + .add(2L, "Trudy") + .build(); + + @Parameters(name = "fileFormat={0}, catalog={1}") + public static Collection parameters() { + Collection testParams = new ArrayList<>(); + for (FileFormat fileFormat : fileFormats) { + for (TestTables.TestTableType testTableType : TestTables.ALL_TABLE_TYPES) { + testParams.add(new Object[] {fileFormat, testTableType}); + } + } + + return testParams; + } + + @Parameter(0) + public FileFormat fileFormat; + + @Parameter(1) + public TestTables.TestTableType testTableType; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static TestHiveShell shell; + + private TestTables testTables; + + @BeforeClass + public static void beforeClass() { + shell = new TestHiveShell(); + shell.setHiveConfValue("hive.notification.event.poll.interval", "-1"); + shell.start(); + } + + @AfterClass + public static void afterClass() { + shell.stop(); + } + + @Before + public void before() throws IOException { + shell.openSession(); + testTables = testTableType.instance(shell.metastore().hiveConf(), temp); + for (Map.Entry property : testTables.properties().entrySet()) { + shell.setHiveSessionValue(property.getKey(), property.getValue()); + } + // Spark is not supported, but these tests should not use it anyway. Setting to a wrong value to detect misplaced + // tests + shell.setHiveSessionValue("hive.execution.engine", "spark"); + } + + @After + public void after() throws Exception { + shell.closeSession(); + shell.metastore().reset(); + } + + @Test + public void testScanEmptyTable() throws IOException { + Schema emptySchema = new Schema(required(1, "empty", Types.StringType.get())); + testTables.createTable(shell, "empty", emptySchema, fileFormat, ImmutableList.of()); + + List rows = shell.executeStatement("SELECT * FROM default.empty"); + Assert.assertEquals(0, rows.size()); + } + + @Test + public void testScanTable() throws IOException { + testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS); + + // Single fetch task: no MR job. + List rows = shell.executeStatement("SELECT * FROM default.customers"); + + Assert.assertEquals(3, rows.size()); + Assert.assertArrayEquals(new Object[]{0L, "Alice"}, rows.get(0)); + Assert.assertArrayEquals(new Object[]{1L, "Bob"}, rows.get(1)); + Assert.assertArrayEquals(new Object[]{2L, "Trudy"}, rows.get(2)); + } + + @Test + public void testDecimalTableWithPredicateLiterals() throws IOException { + Schema schema = new Schema(required(1, "decimal_field", Types.DecimalType.of(7, 2))); + List records = TestHelper.RecordsBuilder.newInstance(schema) + .add(new BigDecimal("85.00")) + .add(new BigDecimal("100.56")) + .add(new BigDecimal("100.57")) + .build(); + testTables.createTable(shell, "dec_test", schema, fileFormat, records); + + // Use integer literal in predicate + List rows = shell.executeStatement("SELECT * FROM default.dec_test where decimal_field >= 85"); + Assert.assertEquals(3, rows.size()); + Assert.assertArrayEquals(new Object[] {"85.00"}, rows.get(0)); + Assert.assertArrayEquals(new Object[] {"100.56"}, rows.get(1)); + Assert.assertArrayEquals(new Object[] {"100.57"}, rows.get(2)); + + // Use decimal literal in predicate with smaller scale than schema type definition + rows = shell.executeStatement("SELECT * FROM default.dec_test where decimal_field > 99.1"); + Assert.assertEquals(2, rows.size()); + Assert.assertArrayEquals(new Object[] {"100.56"}, rows.get(0)); + Assert.assertArrayEquals(new Object[] {"100.57"}, rows.get(1)); + + // Use decimal literal in predicate with higher scale than schema type definition + rows = shell.executeStatement("SELECT * FROM default.dec_test where decimal_field > 100.565"); + Assert.assertEquals(1, rows.size()); + Assert.assertArrayEquals(new Object[] {"100.57"}, rows.get(0)); + + // Use decimal literal in predicate with the same scale as schema type definition + rows = shell.executeStatement("SELECT * FROM default.dec_test where decimal_field > 640.34"); + Assert.assertEquals(0, rows.size()); + } + + @Test + public void testArrayOfPrimitivesInTable() throws IOException { + Schema schema = + new Schema(required(1, "arrayofprimitives", Types.ListType.ofRequired(2, Types.IntegerType.get()))); + List records = testTables.createTableWithGeneratedRecords(shell, "arraytable", schema, fileFormat, 1, 0L); + // access a single element from the array + for (int i = 0; i < records.size(); i++) { + List expectedList = (List) records.get(i).getField("arrayofprimitives"); + for (int j = 0; j < expectedList.size(); j++) { + List queryResult = shell.executeStatement( + String.format("SELECT arrayofprimitives[%d] FROM default.arraytable " + "LIMIT 1 OFFSET %d", j, i)); + Assert.assertEquals(expectedList.get(j), queryResult.get(0)[0]); + } + } + } + + @Test + public void testArrayOfArraysInTable() throws IOException { + Schema schema = + new Schema( + required(1, "arrayofarrays", + Types.ListType.ofRequired(2, Types.ListType.ofRequired(3, Types.DateType.get())))); + List records = testTables.createTableWithGeneratedRecords(shell, "arraytable", schema, fileFormat, 1, 0L); + // access an element from a matrix + for (int i = 0; i < records.size(); i++) { + List expectedList = (List) records.get(i).getField("arrayofarrays"); + for (int j = 0; j < expectedList.size(); j++) { + List expectedInnerList = (List) expectedList.get(j); + for (int k = 0; k < expectedInnerList.size(); k++) { + List queryResult = shell.executeStatement( + String.format("SELECT arrayofarrays[%d][%d] FROM default.arraytable " + "LIMIT 1 OFFSET %d", + j, k, i)); + Assert.assertEquals(expectedInnerList.get(k).toString(), queryResult.get(0)[0]); + } + } + } + } + + @Test + public void testArrayOfMapsInTable() throws IOException { + Schema schema = + new Schema(required(1, "arrayofmaps", Types.ListType + .ofRequired(2, Types.MapType.ofRequired(3, 4, Types.StringType.get(), + Types.BooleanType.get())))); + List records = testTables.createTableWithGeneratedRecords(shell, "arraytable", schema, fileFormat, 1, 0L); + // access an element from a map in an array + for (int i = 0; i < records.size(); i++) { + List expectedList = (List) records.get(i).getField("arrayofmaps"); + for (int j = 0; j < expectedList.size(); j++) { + Map expectedMap = (Map) expectedList.get(j); + for (Map.Entry entry : expectedMap.entrySet()) { + List queryResult = shell.executeStatement(String + .format("SELECT arrayofmaps[%d][\"%s\"] FROM default.arraytable LIMIT 1 OFFSET %d", j, + entry.getKey(), i)); + Assert.assertEquals(entry.getValue(), queryResult.get(0)[0]); + } + } + } + } + + @Test + public void testArrayOfStructsInTable() throws IOException { + Schema schema = + new Schema( + required(1, "arrayofstructs", Types.ListType.ofRequired(2, Types.StructType + .of(required(3, "something", Types.DoubleType.get()), required(4, "someone", + Types.LongType.get()), required(5, "somewhere", Types.StringType.get()))))); + List records = testTables.createTableWithGeneratedRecords(shell, "arraytable", schema, fileFormat, 1, 0L); + // access an element from a struct in an array + for (int i = 0; i < records.size(); i++) { + List expectedList = (List) records.get(i).getField("arrayofstructs"); + for (int j = 0; j < expectedList.size(); j++) { + List queryResult = shell.executeStatement(String.format("SELECT arrayofstructs[%d].something, " + + "arrayofstructs[%d].someone, arrayofstructs[%d].somewhere FROM default.arraytable LIMIT 1 " + + "OFFSET %d", j, j, j, i)); + GenericRecord genericRecord = (GenericRecord) expectedList.get(j); + Assert.assertEquals(genericRecord.getField("something"), queryResult.get(0)[0]); + Assert.assertEquals(genericRecord.getField("someone"), queryResult.get(0)[1]); + Assert.assertEquals(genericRecord.getField("somewhere"), queryResult.get(0)[2]); + } + } + } + + @Test + public void testMapOfPrimitivesInTable() throws IOException { + Schema schema = new Schema( + required(1, "mapofprimitives", Types.MapType.ofRequired(2, 3, Types.StringType.get(), + Types.IntegerType.get()))); + List records = testTables.createTableWithGeneratedRecords(shell, "maptable", schema, fileFormat, 1, 0L); + // access a single value from the map + for (int i = 0; i < records.size(); i++) { + Map expectedMap = (Map) records.get(i).getField("mapofprimitives"); + for (Map.Entry entry : expectedMap.entrySet()) { + List queryResult = shell.executeStatement(String + .format("SELECT mapofprimitives[\"%s\"] " + "FROM default.maptable LIMIT 1 OFFSET %d", entry.getKey(), + i)); + Assert.assertEquals(entry.getValue(), queryResult.get(0)[0]); + } + } + } + + @Test + public void testMapOfArraysInTable() throws IOException { + Schema schema = new Schema( + required(1, "mapofarrays", + Types.MapType.ofRequired(2, 3, Types.StringType.get(), Types.ListType.ofRequired(4, + Types.DateType.get())))); + List records = testTables.createTableWithGeneratedRecords(shell, "maptable", schema, fileFormat, 1, 0L); + // access a single element from a list in a map + for (int i = 0; i < records.size(); i++) { + Map expectedMap = (Map) records.get(i).getField("mapofarrays"); + for (Map.Entry entry : expectedMap.entrySet()) { + List expectedList = (List) entry.getValue(); + for (int j = 0; j < expectedList.size(); j++) { + List queryResult = shell.executeStatement(String + .format("SELECT mapofarrays[\"%s\"]" + "[%d] FROM maptable LIMIT 1 OFFSET %d", entry.getKey(), j, i)); + Assert.assertEquals(expectedList.get(j).toString(), queryResult.get(0)[0]); + } + } + } + } + + @Test + public void testMapOfMapsInTable() throws IOException { + Schema schema = new Schema( + required(1, "mapofmaps", Types.MapType.ofRequired(2, 3, Types.StringType.get(), + Types.MapType.ofRequired(4, 5, Types.StringType.get(), Types.StringType.get())))); + List records = testTables.createTableWithGeneratedRecords(shell, "maptable", schema, fileFormat, 1, 0L); + // access a single element from a map in a map + for (int i = 0; i < records.size(); i++) { + Map expectedMap = (Map) records.get(i).getField("mapofmaps"); + for (Map.Entry entry : expectedMap.entrySet()) { + Map expectedInnerMap = (Map) entry.getValue(); + for (Map.Entry innerEntry : expectedInnerMap.entrySet()) { + List queryResult = shell.executeStatement(String + .format("SELECT mapofmaps[\"%s\"]" + "[\"%s\"] FROM maptable LIMIT 1 OFFSET %d", entry.getKey(), + innerEntry.getKey(), i)); + Assert.assertEquals(innerEntry.getValue(), queryResult.get(0)[0]); + } + } + } + } + + @Test + public void testMapOfStructsInTable() throws IOException { + Schema schema = new Schema( + required(1, "mapofstructs", Types.MapType.ofRequired(2, 3, Types.StringType.get(), + Types.StructType.of(required(4, "something", Types.DoubleType.get()), + required(5, "someone", Types.LongType.get()), + required(6, "somewhere", Types.StringType.get()))))); + List records = testTables.createTableWithGeneratedRecords(shell, "maptable", schema, fileFormat, 1, 0L); + // access a single element from a struct in a map + for (int i = 0; i < records.size(); i++) { + Map expectedMap = (Map) records.get(i).getField("mapofstructs"); + for (Map.Entry entry : expectedMap.entrySet()) { + List queryResult = shell.executeStatement(String.format("SELECT mapofstructs[\"%s\"].something, " + + "mapofstructs[\"%s\"].someone, mapofstructs[\"%s\"].somewhere FROM default.maptable LIMIT 1 " + + "OFFSET %d", entry.getKey(), entry.getKey(), entry.getKey(), i)); + GenericRecord genericRecord = (GenericRecord) entry.getValue(); + Assert.assertEquals(genericRecord.getField("something"), queryResult.get(0)[0]); + Assert.assertEquals(genericRecord.getField("someone"), queryResult.get(0)[1]); + Assert.assertEquals(genericRecord.getField("somewhere"), queryResult.get(0)[2]); + } + } + } + + @Test + public void testStructOfPrimitivesInTable() throws IOException { + Schema schema = new Schema(required(1, "structofprimitives", + Types.StructType.of(required(2, "key", Types.StringType.get()), required(3, "value", + Types.IntegerType.get())))); + List records = testTables.createTableWithGeneratedRecords(shell, "structtable", schema, fileFormat, 1, 0L); + // access a single value in a struct + for (int i = 0; i < records.size(); i++) { + GenericRecord expectedStruct = (GenericRecord) records.get(i).getField("structofprimitives"); + List queryResult = shell.executeStatement(String.format( + "SELECT structofprimitives.key, structofprimitives.value FROM default.structtable LIMIT 1 OFFSET %d", i)); + Assert.assertEquals(expectedStruct.getField("key"), queryResult.get(0)[0]); + Assert.assertEquals(expectedStruct.getField("value"), queryResult.get(0)[1]); + } + } + + @Test + public void testStructOfArraysInTable() throws IOException { + Schema schema = new Schema( + required(1, "structofarrays", Types.StructType + .of(required(2, "names", Types.ListType.ofRequired(3, Types.StringType.get())), + required(4, "birthdays", Types.ListType.ofRequired(5, + Types.DateType.get()))))); + List records = testTables.createTableWithGeneratedRecords(shell, "structtable", schema, fileFormat, 1, 0L); + // access an element of an array inside a struct + for (int i = 0; i < records.size(); i++) { + GenericRecord expectedStruct = (GenericRecord) records.get(i).getField("structofarrays"); + List expectedList = (List) expectedStruct.getField("names"); + for (int j = 0; j < expectedList.size(); j++) { + List queryResult = shell.executeStatement( + String.format("SELECT structofarrays.names[%d] FROM default.structtable LIMIT 1 OFFSET %d", j, i)); + Assert.assertEquals(expectedList.get(j), queryResult.get(0)[0]); + } + expectedList = (List) expectedStruct.getField("birthdays"); + for (int j = 0; j < expectedList.size(); j++) { + List queryResult = shell.executeStatement( + String.format("SELECT structofarrays.birthdays[%d] FROM default.structtable LIMIT 1 OFFSET %d", j, i)); + Assert.assertEquals(expectedList.get(j).toString(), queryResult.get(0)[0]); + } + } + } + + @Test + public void testStructOfMapsInTable() throws IOException { + Schema schema = new Schema( + required(1, "structofmaps", Types.StructType + .of(required(2, "map1", Types.MapType.ofRequired(3, 4, + Types.StringType.get(), Types.StringType.get())), required(5, "map2", + Types.MapType.ofRequired(6, 7, Types.StringType.get(), + Types.IntegerType.get()))))); + List records = testTables.createTableWithGeneratedRecords(shell, "structtable", schema, fileFormat, 1, 0L); + // access a map entry inside a struct + for (int i = 0; i < records.size(); i++) { + GenericRecord expectedStruct = (GenericRecord) records.get(i).getField("structofmaps"); + Map expectedMap = (Map) expectedStruct.getField("map1"); + for (Map.Entry entry : expectedMap.entrySet()) { + List queryResult = shell.executeStatement(String + .format("SELECT structofmaps.map1[\"%s\"] from default.structtable LIMIT 1 OFFSET %d", entry.getKey(), + i)); + Assert.assertEquals(entry.getValue(), queryResult.get(0)[0]); + } + expectedMap = (Map) expectedStruct.getField("map2"); + for (Map.Entry entry : expectedMap.entrySet()) { + List queryResult = shell.executeStatement(String + .format("SELECT structofmaps.map2[\"%s\"] from default.structtable LIMIT 1 OFFSET %d", entry.getKey(), + i)); + Assert.assertEquals(entry.getValue(), queryResult.get(0)[0]); + } + } + } + + @Test + public void testStructOfStructsInTable() throws IOException { + Schema schema = new Schema( + required(1, "structofstructs", Types.StructType.of(required(2, "struct1", Types.StructType + .of(required(3, "key", Types.StringType.get()), required(4, "value", + Types.IntegerType.get())))))); + List records = testTables.createTableWithGeneratedRecords(shell, "structtable", schema, fileFormat, 1, 0L); + // access a struct element inside a struct + for (int i = 0; i < records.size(); i++) { + GenericRecord expectedStruct = (GenericRecord) records.get(i).getField("structofstructs"); + GenericRecord expectedInnerStruct = (GenericRecord) expectedStruct.getField("struct1"); + List queryResult = shell.executeStatement(String.format( + "SELECT structofstructs.struct1.key, structofstructs.struct1.value FROM default.structtable " + + "LIMIT 1 OFFSET %d", i)); + Assert.assertEquals(expectedInnerStruct.getField("key"), queryResult.get(0)[0]); + Assert.assertEquals(expectedInnerStruct.getField("value"), queryResult.get(0)[1]); + } + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleEngines.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleEngines.java new file mode 100644 index 000000000000..89f2e99c0aa8 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleEngines.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.TestHelpers.Row; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.mr.TestHelper; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.junit.runners.Parameterized.Parameter; +import static org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class TestHiveIcebergStorageHandlerWithMultipleEngines { + private static final FileFormat[] fileFormats = + new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, FileFormat.PARQUET}; + + private static final String[] executionEngines = new String[] {"mr", "tez"}; + + private static final Schema CUSTOMER_SCHEMA = new Schema( + optional(1, "customer_id", Types.LongType.get()), + optional(2, "first_name", Types.StringType.get()) + ); + + private static final List CUSTOMER_RECORDS = TestHelper.RecordsBuilder.newInstance(CUSTOMER_SCHEMA) + .add(0L, "Alice") + .add(1L, "Bob") + .add(2L, "Trudy") + .build(); + + private static final Schema ORDER_SCHEMA = new Schema( + required(1, "order_id", Types.LongType.get()), + required(2, "customer_id", Types.LongType.get()), + required(3, "total", Types.DoubleType.get())); + + private static final List ORDER_RECORDS = TestHelper.RecordsBuilder.newInstance(ORDER_SCHEMA) + .add(100L, 0L, 11.11d) + .add(101L, 0L, 22.22d) + .add(102L, 1L, 33.33d) + .build(); + + private static final List SUPPORTED_TYPES = + ImmutableList.of(Types.BooleanType.get(), Types.IntegerType.get(), Types.LongType.get(), + Types.FloatType.get(), Types.DoubleType.get(), Types.DateType.get(), Types.TimestampType.withZone(), + Types.TimestampType.withoutZone(), Types.StringType.get(), Types.BinaryType.get(), + Types.DecimalType.of(3, 1)); + + @Parameters(name = "fileFormat={0}, engine={1}, catalog={2}") + public static Collection parameters() { + String javaVersion = System.getProperty("java.specification.version"); + + Collection testParams = new ArrayList<>(); + for (FileFormat fileFormat : fileFormats) { + for (String engine : executionEngines) { + // include Tez tests only for Java 8 + if (javaVersion.equals("1.8")) { + for (TestTables.TestTableType testTableType : TestTables.ALL_TABLE_TYPES) { + testParams.add(new Object[] {fileFormat, engine, testTableType}); + } + } + } + } + + return testParams; + } + + @Parameter(0) + public FileFormat fileFormat; + + @Parameter(1) + public String executionEngine; + + @Parameter(2) + public TestTables.TestTableType testTableType; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static TestHiveShell shell; + + private TestTables testTables; + + @BeforeClass + public static void beforeClass() { + shell = new TestHiveShell(); + shell.setHiveConfValue("hive.notification.event.poll.interval", "-1"); + shell.start(); + } + + @AfterClass + public static void afterClass() { + shell.stop(); + } + + @Before + public void before() throws IOException { + shell.openSession(); + testTables = testTableType.instance(shell.metastore().hiveConf(), temp); + for (Map.Entry property : testTables.properties().entrySet()) { + shell.setHiveSessionValue(property.getKey(), property.getValue()); + } + shell.setHiveSessionValue("hive.execution.engine", executionEngine); + shell.setHiveSessionValue("hive.jar.directory", temp.getRoot().getAbsolutePath()); + shell.setHiveSessionValue("tez.staging-dir", temp.getRoot().getAbsolutePath()); + } + + @After + public void after() throws Exception { + shell.closeSession(); + shell.metastore().reset(); + } + + @Test + public void testScanTable() throws IOException { + testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS); + + // Adding the ORDER BY clause will cause Hive to spawn a local MR job this time. + List descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC"); + + Assert.assertEquals(3, descRows.size()); + Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, descRows.get(0)); + Assert.assertArrayEquals(new Object[] {1L, "Bob"}, descRows.get(1)); + Assert.assertArrayEquals(new Object[] {0L, "Alice"}, descRows.get(2)); + } + + @Test + public void testJoinTables() throws IOException { + testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS); + testTables.createTable(shell, "orders", ORDER_SCHEMA, fileFormat, ORDER_RECORDS); + + List rows = shell.executeStatement( + "SELECT c.customer_id, c.first_name, o.order_id, o.total " + + "FROM default.customers c JOIN default.orders o ON c.customer_id = o.customer_id " + + "ORDER BY c.customer_id, o.order_id" + ); + + Assert.assertArrayEquals(new Object[] {0L, "Alice", 100L, 11.11d}, rows.get(0)); + Assert.assertArrayEquals(new Object[] {0L, "Alice", 101L, 22.22d}, rows.get(1)); + Assert.assertArrayEquals(new Object[] {1L, "Bob", 102L, 33.33d}, rows.get(2)); + } + + @Test + public void testJoinTablesSupportedTypes() throws IOException { + for (int i = 0; i < SUPPORTED_TYPES.size(); i++) { + Type type = SUPPORTED_TYPES.get(i); + String tableName = type.typeId().toString().toLowerCase() + "_table_" + i; + String columnName = type.typeId().toString().toLowerCase() + "_column"; + + Schema schema = new Schema(required(1, columnName, type)); + List records = TestHelper.generateRandomRecords(schema, 1, 0L); + + testTables.createTable(shell, tableName, schema, fileFormat, records); + List queryResult = shell.executeStatement("select s." + columnName + ", h." + columnName + + " from default." + tableName + " s join default." + tableName + " h on h." + columnName + "=s." + + columnName); + Assert.assertEquals("Non matching record count for table " + tableName + " with type " + type, + 1, queryResult.size()); + } + } + + @Test + public void testSelectDistinctFromTable() throws IOException { + for (int i = 0; i < SUPPORTED_TYPES.size(); i++) { + Type type = SUPPORTED_TYPES.get(i); + String tableName = type.typeId().toString().toLowerCase() + "_table_" + i; + String columnName = type.typeId().toString().toLowerCase() + "_column"; + + Schema schema = new Schema(required(1, columnName, type)); + List records = TestHelper.generateRandomRecords(schema, 4, 0L); + int size = records.stream().map(r -> r.getField(columnName)).collect(Collectors.toSet()).size(); + testTables.createTable(shell, tableName, schema, fileFormat, records); + List queryResult = shell.executeStatement("select count(distinct(" + columnName + + ")) from default." + tableName); + int distincIds = ((Long) queryResult.get(0)[0]).intValue(); + Assert.assertEquals(tableName, size, distincIds); + } + } + + @Test + public void testCreateTableWithColumnSpecification() throws IOException { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + shell.executeStatement("CREATE EXTERNAL TABLE customers (customer_id BIGINT, first_name STRING) " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier)); + + // Check the Iceberg table data + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); + Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); + Assert.assertEquals(PartitionSpec.unpartitioned(), icebergTable.spec()); + + testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, null, CUSTOMER_RECORDS); + + List descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC"); + + Assert.assertEquals(3, descRows.size()); + Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, descRows.get(0)); + Assert.assertArrayEquals(new Object[] {1L, "Bob"}, descRows.get(1)); + Assert.assertArrayEquals(new Object[] {0L, "Alice"}, descRows.get(2)); + } + + @Test + public void testCreatePartitionedTable() throws IOException { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("first_name").build(); + + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier) + + "TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(spec) + "', " + + "'" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); + + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); + Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); + Assert.assertEquals(spec, icebergTable.spec()); + + testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, Row.of("Alice"), + Arrays.asList(CUSTOMER_RECORDS.get(0))); + testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, Row.of("Bob"), + Arrays.asList(CUSTOMER_RECORDS.get(1))); + testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, Row.of("Trudy"), + Arrays.asList(CUSTOMER_RECORDS.get(2))); + + List descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC"); + + Assert.assertEquals(3, descRows.size()); + Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, descRows.get(0)); + Assert.assertArrayEquals(new Object[] {1L, "Bob"}, descRows.get(1)); + Assert.assertArrayEquals(new Object[] {0L, "Alice"}, descRows.get(2)); + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithoutEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithoutEngine.java new file mode 100644 index 000000000000..cc37a9faa43a --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithoutEngine.java @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.mr.Catalogs; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.types.Types; +import org.apache.thrift.TException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.junit.runners.Parameterized.Parameter; +import static org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class TestHiveIcebergStorageHandlerWithoutEngine { + + private static final Schema CUSTOMER_SCHEMA = new Schema( + optional(1, "customer_id", Types.LongType.get()), + optional(2, "first_name", Types.StringType.get()) + ); + + private static final PartitionSpec UNPARTITIONED = PartitionSpec.unpartitioned(); + + private static final PartitionSpec IDENTITY_SPEC = + PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("customer_id").build(); + + private static final Schema COMPLEX_SCHEMA = new Schema( + optional(1, "id", Types.LongType.get()), + optional(2, "name", Types.StringType.get()), + optional(3, "employee_info", Types.StructType.of( + optional(7, "employer", Types.StringType.get()), + optional(8, "id", Types.LongType.get()), + optional(9, "address", Types.StringType.get()) + )), + optional(4, "places_lived", Types.ListType.ofOptional(10, Types.StructType.of( + optional(11, "street", Types.StringType.get()), + optional(12, "city", Types.StringType.get()), + optional(13, "country", Types.StringType.get()) + ))), + optional(5, "memorable_moments", Types.MapType.ofOptional(14, 15, + Types.StringType.get(), + Types.StructType.of( + optional(16, "year", Types.IntegerType.get()), + optional(17, "place", Types.StringType.get()), + optional(18, "details", Types.StringType.get()) + ))), + optional(6, "current_address", Types.StructType.of( + optional(19, "street_address", Types.StructType.of( + optional(22, "street_number", Types.IntegerType.get()), + optional(23, "street_name", Types.StringType.get()), + optional(24, "street_type", Types.StringType.get()) + )), + optional(20, "country", Types.StringType.get()), + optional(21, "postal_code", Types.StringType.get()) + )) + ); + + private static final Set IGNORED_PARAMS = + ImmutableSet.of("bucketing_version", StatsSetupConst.ROW_COUNT, + StatsSetupConst.RAW_DATA_SIZE, StatsSetupConst.TOTAL_SIZE, StatsSetupConst.NUM_FILES, "numFilesErasureCoded"); + + @Parameters(name = "catalog={0}") + public static Collection parameters() { + Collection testParams = new ArrayList<>(); + for (TestTables.TestTableType testTableType : TestTables.ALL_TABLE_TYPES) { + testParams.add(new Object[] {testTableType}); + } + + return testParams; + } + + @Parameter(0) + public TestTables.TestTableType testTableType; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static TestHiveShell shell; + + private TestTables testTables; + + @BeforeClass + public static void beforeClass() { + shell = new TestHiveShell(); + shell.setHiveConfValue("hive.notification.event.poll.interval", "-1"); + shell.setHiveConfValue("hive.tez.exec.print.summary", "true"); + shell.start(); + } + + @AfterClass + public static void afterClass() { + shell.stop(); + } + + @Before + public void before() throws IOException { + shell.openSession(); + testTables = testTableType.instance(shell.metastore().hiveConf(), temp); + for (Map.Entry property : testTables.properties().entrySet()) { + shell.setHiveSessionValue(property.getKey(), property.getValue()); + } + } + + @After + public void after() throws Exception { + shell.closeSession(); + shell.metastore().reset(); + // HiveServer2 thread pools are using thread local Hive -> HMSClient objects. These are not cleaned up when the + // HiveServer2 is stopped. Only Finalizer closes the HMS connections. + System.gc(); + } + + @Test + public void testCreateDropTable() throws TException, IOException, InterruptedException { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier) + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "', " + + "'" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(IDENTITY_SPEC) + "', " + + "'dummy'='test')"); + + // Check the Iceberg table data + Table icebergTable = testTables.loadTable(identifier); + Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); + Assert.assertEquals(IDENTITY_SPEC, icebergTable.spec()); + + // Check the HMS table parameters + org.apache.hadoop.hive.metastore.api.Table hmsTable = + shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); + + Map hmsParams = hmsTable.getParameters(); + IGNORED_PARAMS.forEach(hmsParams::remove); + + // This is only set for HiveCatalog based tables. Check the value, then remove it so the other checks can be general + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + Assert.assertTrue(hmsParams.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) + .startsWith(icebergTable.location())); + hmsParams.remove(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + } + + // General metadata checks + Assert.assertEquals(6, hmsParams.size()); + Assert.assertEquals("test", hmsParams.get("dummy")); + Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE)); + Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); + Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME)); + Assert.assertEquals(HiveIcebergStorageHandler.class.getName(), + hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE)); + Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(), + hmsTable.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); + + if (!Catalogs.hiveCatalog(shell.getHiveConf())) { + Assert.assertEquals(Collections.singletonMap("dummy", "test"), icebergTable.properties()); + + shell.executeStatement("DROP TABLE customers"); + + // Check if the table was really dropped even from the Catalog + AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, + "Table does not exist", () -> { + testTables.loadTable(identifier); + } + ); + } else { + Map expectedIcebergProperties = new HashMap<>(2); + expectedIcebergProperties.put("dummy", "test"); + expectedIcebergProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true"); + Assert.assertEquals(expectedIcebergProperties, icebergTable.properties()); + + // Check the HMS table parameters + hmsTable = shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); + Path hmsTableLocation = new Path(hmsTable.getSd().getLocation()); + + // Drop the table + shell.executeStatement("DROP TABLE customers"); + + // Check if we drop an exception when trying to load the table + AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, + "Table does not exist", () -> { + testTables.loadTable(identifier); + } + ); + + // Check if the files are removed + FileSystem fs = Util.getFs(hmsTableLocation, shell.getHiveConf()); + if (fs.exists(hmsTableLocation)) { + // if table directory has been deleted, we're good. This is the expected behavior in Hive4. + // if table directory exists, its contents should have been cleaned up, save for an empty metadata dir (Hive3). + Assert.assertEquals(1, fs.listStatus(hmsTableLocation).length); + Assert.assertEquals(0, fs.listStatus(new Path(hmsTableLocation, "metadata")).length); + } + } + } + + @Test + public void testCreateTableWithoutSpec() throws TException, InterruptedException { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier) + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); + + // Check the Iceberg table partition data + Table icebergTable = testTables.loadTable(identifier); + Assert.assertEquals(UNPARTITIONED, icebergTable.spec()); + + // Check the HMS table parameters + org.apache.hadoop.hive.metastore.api.Table hmsTable = + shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); + + Map hmsParams = hmsTable.getParameters(); + IGNORED_PARAMS.forEach(hmsParams::remove); + + // Just check that the PartitionSpec is not set in the metadata + Assert.assertNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC)); + + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + Assert.assertEquals(6, hmsParams.size()); + } else { + Assert.assertEquals(5, hmsParams.size()); + } + } + + @Test + public void testCreateTableWithUnpartitionedSpec() throws TException, InterruptedException { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + // We need the location for HadoopTable based tests only + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier) + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "', " + + "'" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(UNPARTITIONED) + "')"); + + // Check the Iceberg table partition data + Table icebergTable = testTables.loadTable(identifier); + Assert.assertEquals(UNPARTITIONED, icebergTable.spec()); + + // Check the HMS table parameters + org.apache.hadoop.hive.metastore.api.Table hmsTable = + shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); + + Map hmsParams = hmsTable.getParameters(); + IGNORED_PARAMS.forEach(hmsParams::remove); + + // Just check that the PartitionSpec is not set in the metadata + Assert.assertNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC)); + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + Assert.assertEquals(6, hmsParams.size()); + } else { + Assert.assertEquals(5, hmsParams.size()); + } + } + + @Test + public void testDeleteBackingTable() throws TException, IOException, InterruptedException { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier) + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "', " + + "'" + InputFormatConfig.EXTERNAL_TABLE_PURGE + "'='FALSE')"); + + if (!Catalogs.hiveCatalog(shell.getHiveConf())) { + shell.executeStatement("DROP TABLE customers"); + + // Check if the table remains + testTables.loadTable(identifier); + } else { + // Check the HMS table parameters + org.apache.hadoop.hive.metastore.api.Table hmsTable = + shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); + Path hmsTableLocation = new Path(hmsTable.getSd().getLocation()); + + // Drop the table + shell.executeStatement("DROP TABLE customers"); + + // Check if we drop an exception when trying to drop the table + AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, + "Table does not exist", () -> { + testTables.loadTable(identifier); + } + ); + + // Check if the files are kept + FileSystem fs = Util.getFs(hmsTableLocation, shell.getHiveConf()); + Assert.assertEquals(1, fs.listStatus(hmsTableLocation).length); + Assert.assertEquals(1, fs.listStatus(new Path(hmsTableLocation, "metadata")).length); + } + } + + @Test + public void testCreateTableError() { + TableIdentifier identifier = TableIdentifier.of("default", "withShell2"); + + // Wrong schema + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "Unrecognized token 'WrongSchema'", () -> { + shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier) + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='WrongSchema')"); + } + ); + + // Missing schema, we try to get the schema from the table and fail + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "Please provide ", () -> { + shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier)); + } + ); + + if (!testTables.locationForCreateTableSQL(identifier).isEmpty()) { + // Only test this if the location is required + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "Table location not set", () -> { + shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); + } + ); + } + } + + @Test + public void testCreateTableAboveExistingTable() throws TException, IOException, InterruptedException { + // Create the Iceberg table + testTables.createIcebergTable(shell.getHiveConf(), "customers", COMPLEX_SCHEMA, FileFormat.PARQUET, null); + + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + + // In HiveCatalog we just expect an exception since the table is already exists + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "customers already exists", () -> { + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); + } + ); + } else { + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers"))); + + // Check the HMS table parameters + org.apache.hadoop.hive.metastore.api.Table hmsTable = + shell.metastore().clientPool().run(client -> client.getTable("default", "customers")); + + Map hmsParams = hmsTable.getParameters(); + IGNORED_PARAMS.forEach(hmsParams::remove); + + Assert.assertEquals(4, hmsParams.size()); + Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); + Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME)); + Assert.assertEquals(HiveIcebergStorageHandler.class.getName(), + hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE)); + Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(), + hmsTable.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); + } + } + + @Test + public void testCreateTableWithColumnSpecificationPartitioned() { + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "currently not supported", () -> { + shell.executeStatement("CREATE EXTERNAL TABLE customers (customer_id BIGINT) " + + "PARTITIONED BY (first_name STRING) " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers"))); + } + ); + } + + @Test + public void testCreateTableWithColumnSpecificationHierarchy() { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + shell.executeStatement("CREATE EXTERNAL TABLE customers (" + + "id BIGINT, name STRING, " + + "employee_info STRUCT < employer: STRING, id: BIGINT, address: STRING >, " + + "places_lived ARRAY < STRUCT >, " + + "memorable_moments MAP < STRING, STRUCT < year: INT, place: STRING, details: STRING >>, " + + "current_address STRUCT < street_address: STRUCT " + + ", country: STRING, postal_code: STRING >) " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier)); + + // Check the Iceberg table data + Table icebergTable = testTables.loadTable(identifier); + Assert.assertEquals(COMPLEX_SCHEMA.asStruct(), icebergTable.schema().asStruct()); + } + + @Test + public void testCreateTableWithAllSupportedTypes() { + TableIdentifier identifier = TableIdentifier.of("default", "all_types"); + Schema allSupportedSchema = new Schema( + optional(1, "t_float", Types.FloatType.get()), + optional(2, "t_double", Types.DoubleType.get()), + optional(3, "t_boolean", Types.BooleanType.get()), + optional(4, "t_int", Types.IntegerType.get()), + optional(5, "t_bigint", Types.LongType.get()), + optional(6, "t_binary", Types.BinaryType.get()), + optional(7, "t_string", Types.StringType.get()), + optional(8, "t_timestamp", Types.TimestampType.withoutZone()), + optional(9, "t_date", Types.DateType.get()), + optional(10, "t_decimal", Types.DecimalType.of(3, 2)) + ); + + // Intentionally adding some mixed letters to test that we handle them correctly + shell.executeStatement("CREATE EXTERNAL TABLE all_types (" + + "t_Float FLOaT, t_dOuble DOUBLE, t_boolean BOOLEAN, t_int INT, t_bigint BIGINT, t_binary BINARY, " + + "t_string STRING, t_timestamp TIMESTAMP, t_date DATE, t_decimal DECIMAL(3,2)) " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier)); + + // Check the Iceberg table data + org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); + Assert.assertEquals(allSupportedSchema.asStruct(), icebergTable.schema().asStruct()); + } + + @Test + public void testCreateTableWithNotSupportedTypes() { + TableIdentifier identifier = TableIdentifier.of("default", "not_supported_types"); + // Can not create INTERVAL types from normal create table, so leave them out from this test + String[] notSupportedTypes = new String[] { "TINYINT", "SMALLINT", "VARCHAR(1)", "CHAR(1)" }; + + for (String notSupportedType : notSupportedTypes) { + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "Unsupported Hive type", () -> { + shell.executeStatement("CREATE EXTERNAL TABLE not_supported_types " + + "(not_supported " + notSupportedType + ") " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier)); + } + ); + } + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java index 5fcab6d36b95..743384a23c87 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java @@ -23,23 +23,26 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Collections; +import java.util.List; import java.util.Map; -import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.Tables; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hive.HiveCatalogs; import org.apache.iceberg.hive.MetastoreUtil; -import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.TestCatalogs; +import org.apache.iceberg.mr.TestHelper; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays; @@ -48,6 +51,12 @@ // Helper class for setting up and testing various catalog implementations abstract class TestTables { + public static final TestTableType[] ALL_TABLE_TYPES = new TestTableType[] { + TestTableType.HADOOP_TABLE, + TestTableType.HADOOP_CATALOG, + TestTableType.CUSTOM_CATALOG, + TestTableType.HIVE_CATALOG + }; private final Tables tables; protected final TemporaryFolder temp; @@ -83,12 +92,107 @@ public Tables tables() { public abstract String locationForCreateTableSQL(TableIdentifier identifier); /** - * If the {@link Catalogs#LOCATION} is needed for {@link Catalogs#loadTable(Configuration, Properties)} then this - * method should provide the location string. It should return null if the location is not needed. - * @param identifier The table identifier - * @return The location string for loadTable operation + * If creating the Hive table independently is needed for the given Catalog then this should return the Hive SQL + * string which is needed to be executed. + * @param identifier The table identifier (the namespace should be "default") + * @return The SQL string - which should be executed, null - if it is not needed. + */ + public String createHiveTableSQL(TableIdentifier identifier) { + return String.format("CREATE TABLE default.%s STORED BY '%s' %s", identifier.name(), + HiveIcebergStorageHandler.class.getName(), locationForCreateTableSQL(identifier)); + } + + /** + * Loads the given table from the actual catalog. Overridden by HadoopTables, since the parameter of the + * {@link Tables#load(String)} should be the full path of the table metadata directory + * @param identifier The table we want to load + * @return The Table loaded from the Catalog */ - public abstract String loadLocation(TableIdentifier identifier); + public Table loadTable(TableIdentifier identifier) { + return tables.load(identifier.toString()); + } + + /** + * Creates a Hive test table. Creates the Iceberg table/data and creates the corresponding Hive table as well when + * needed. The table will be in the 'default' database. + * @param shell The HiveShell used for Hive table creation + * @param tableName The name of the test table + * @param schema The schema used for the table creation + * @param fileFormat The file format used for writing the data + * @param records The records with which the table is populated + * @throws IOException If there is an error writing data + */ + public void createTable(TestHiveShell shell, String tableName, Schema schema, FileFormat fileFormat, + List records) throws IOException { + createIcebergTable(shell.getHiveConf(), tableName, schema, fileFormat, records); + String createHiveSQL = createHiveTableSQL(TableIdentifier.of("default", tableName)); + if (createHiveSQL != null) { + shell.executeStatement(createHiveSQL); + } + } + + /** + * Creates a Hive test table. Creates the Iceberg table/data and creates the corresponding Hive table as well when + * needed. The table will be in the 'default' database. + * @param shell The HiveShell used for Hive table creation + * @param tableName The name of the test table + * @param schema The schema used for the table creation + * @param fileFormat The file format used for writing the data + * @param numRecords The number of records should be generated and stored in the table + * @param seed The seed used for the random record generation + * @throws IOException If there is an error writing data + */ + public List createTableWithGeneratedRecords(TestHiveShell shell, String tableName, Schema schema, + FileFormat fileFormat, int numRecords, long seed) throws IOException { + List records = TestHelper.generateRandomRecords(schema, numRecords, seed); + createTable(shell, tableName, schema, fileFormat, records); + return records; + } + + /** + * Creates an Iceberg table/data without creating the corresponding Hive table. The table will be in the 'default' + * namespace. + * @param configuration The configuration used during the table creation + * @param tableName The name of the test table + * @param schema The schema used for the table creation + * @param fileFormat The file format used for writing the data + * @param records The records with which the table is populated + * @return The create table + * @throws IOException If there is an error writing data + */ + public Table createIcebergTable(Configuration configuration, String tableName, Schema schema, FileFormat fileFormat, + List records) throws IOException { + String identifier = identifier("default." + tableName); + TestHelper helper = new TestHelper(new Configuration(configuration), tables(), identifier, schema, + PartitionSpec.unpartitioned(), fileFormat, temp); + Table table = helper.createTable(); + + if (records != null && !records.isEmpty()) { + helper.appendToTable(helper.writeFile(null, records)); + } + + return table; + } + + /** + * Append more data to the table. + * @param configuration The configuration used during the table creation + * @param table The table to append + * @param format The file format used for writing the data + * @param partition The partition to write to + * @param records The records with which should be added to the table + * @throws IOException If there is an error writing data + */ + public void appendIcebergTable(Configuration configuration, Table table, FileFormat format, StructLike partition, + List records) throws IOException { + TestHelper helper = new TestHelper( + configuration, null, null, null, null, format, temp); + + helper.setTable(table); + if (!records.isEmpty()) { + helper.appendToTable(helper.writeFile(partition, records)); + } + } private static class CatalogToTables implements Tables { @@ -142,10 +246,6 @@ public String locationForCreateTableSQL(TableIdentifier identifier) { return "LOCATION '" + warehouseLocation + TestTables.tablePath(identifier) + "' "; } - @Override - public String loadLocation(TableIdentifier identifier) { - return warehouseLocation + TestTables.tablePath(identifier); - } } static class HadoopCatalogTestTables extends TestTables { @@ -173,14 +273,9 @@ public Map properties() { public String locationForCreateTableSQL(TableIdentifier identifier) { return "LOCATION '" + warehouseLocation + TestTables.tablePath(identifier) + "' "; } - - public String loadLocation(TableIdentifier identifier) { - return null; - } } static class HadoopTestTables extends TestTables { - HadoopTestTables(Configuration conf, TemporaryFolder temp) { super(new HadoopTables(conf), temp); } @@ -206,9 +301,10 @@ public String locationForCreateTableSQL(TableIdentifier identifier) { } @Override - public String loadLocation(TableIdentifier identifier) { - return temp.getRoot().getPath() + TestTables.tablePath(identifier); + public Table loadTable(TableIdentifier identifier) { + return tables().load(temp.getRoot().getPath() + TestTables.tablePath(identifier)); } + } static class HiveTestTables extends TestTables { @@ -227,7 +323,8 @@ public String locationForCreateTableSQL(TableIdentifier identifier) { return ""; } - public String loadLocation(TableIdentifier identifier) { + @Override + public String createHiveTableSQL(TableIdentifier identifier) { return null; } } @@ -235,4 +332,29 @@ public String loadLocation(TableIdentifier identifier) { private static String tablePath(TableIdentifier identifier) { return "/" + Joiner.on("/").join(identifier.namespace().levels()) + "/" + identifier.name(); } + + enum TestTableType { + HADOOP_TABLE { + public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) { + return new HadoopTestTables(conf, temporaryFolder); + } + }, + HADOOP_CATALOG { + public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException { + return new HadoopCatalogTestTables(conf, temporaryFolder); + } + }, + CUSTOM_CATALOG { + public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException { + return new CustomCatalogTestTables(conf, temporaryFolder); + } + }, + HIVE_CATALOG { + public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) { + return new HiveTestTables(conf, temporaryFolder); + } + }; + + public abstract TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException; + } }