diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java index 79cd3efee009..627ed41ad503 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java @@ -351,13 +351,17 @@ public TableBuilder buildTable(TableIdentifier identifier, Schema schema) { } private class HadoopCatalogTableBuilder extends BaseMetastoreCatalogTableBuilder { + private final String defaultLocation; + private HadoopCatalogTableBuilder(TableIdentifier identifier, Schema schema) { super(identifier, schema); + defaultLocation = defaultWarehouseLocation(identifier); } @Override public TableBuilder withLocation(String location) { - Preconditions.checkArgument(location == null, "Cannot set a custom location for a path-based table"); + Preconditions.checkArgument(location == null || location.equals(defaultLocation), + "Cannot set a custom location for a path-based table. Expected " + defaultLocation + " but got " + location); return this; } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 351994c93000..de07376188bc 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; @@ -89,22 +90,27 @@ public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) } // If the table does not exist collect data for table creation - String schemaString = catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA); - Preconditions.checkNotNull(schemaString, "Please provide a table schema"); - // Just check if it is parsable, and later use for partition specification parsing - Schema schema = SchemaParser.fromJson(schemaString); - - String specString = catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC); - if (specString != null) { - // Just check if it is parsable - PartitionSpecParser.fromJson(schema, specString); - } + // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC takes precedence so the user can override the + // Iceberg schema and specification generated by the code + // - Partitioned Hive tables are currently not allowed + + Schema schema = schema(catalogProperties, hmsTable); + PartitionSpec spec = spec(schema, catalogProperties, hmsTable); + + catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema)); + catalogProperties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec)); // Allow purging table data if the table is created now and not set otherwise if (hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE) == null) { hmsTable.getParameters().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "TRUE"); } + // If the table is not managed by Hive catalog then the location should be set + if (!Catalogs.hiveCatalog(conf)) { + Preconditions.checkArgument(hmsTable.getSd() != null && hmsTable.getSd().getLocation() != null, + "Table location not set"); + } + // Remove creation related properties PARAMETERS_TO_REMOVE.forEach(hmsTable.getParameters()::remove); } @@ -167,7 +173,7 @@ public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, * @param hmsTable Table for which we are calculating the properties * @return The properties we can provide for Iceberg functions, like {@link Catalogs} */ - private Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) { + private static Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) { Properties properties = new Properties(); properties.putAll(hmsTable.getParameters()); @@ -185,4 +191,25 @@ private Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Tab return properties; } + + private static Schema schema(Properties properties, org.apache.hadoop.hive.metastore.api.Table hmsTable) { + if (properties.getProperty(InputFormatConfig.TABLE_SCHEMA) != null) { + return SchemaParser.fromJson(properties.getProperty(InputFormatConfig.TABLE_SCHEMA)); + } else { + return HiveSchemaUtil.schema(hmsTable.getSd().getCols()); + } + } + + private static PartitionSpec spec(Schema schema, Properties properties, + org.apache.hadoop.hive.metastore.api.Table hmsTable) { + + Preconditions.checkArgument(hmsTable.getPartitionKeys() == null || hmsTable.getPartitionKeys().isEmpty(), + "Partitioned Hive tables are currently not supported"); + + if (properties.getProperty(InputFormatConfig.PARTITION_SPEC) != null) { + return PartitionSpecParser.fromJson(schema, properties.getProperty(InputFormatConfig.PARTITION_SPEC)); + } else { + return PartitionSpec.unpartitioned(); + } + } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java index 8c8f91ec15dc..e665010131f7 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java @@ -22,20 +22,25 @@ import java.util.Properties; import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector; import org.apache.iceberg.mr.mapred.Container; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HiveIcebergSerDe extends AbstractSerDe { + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergSerDe.class); + private ObjectInspector inspector; @Override @@ -57,9 +62,12 @@ public void initialize(@Nullable Configuration configuration, Properties serDePr tableSchema = SchemaParser.fromJson((String) serDeProperties.get(InputFormatConfig.TABLE_SCHEMA)); } else { try { + // always prefer the original table schema if there is one tableSchema = Catalogs.loadTable(configuration, serDeProperties).schema(); - } catch (NoSuchTableException nte) { - throw new SerDeException("Please provide an existing table or a valid schema", nte); + LOG.info("Using schema from existing table {}", SchemaParser.toJson(tableSchema)); + } catch (Exception e) { + // If we can not load the table try the provided hive schema + tableSchema = hiveSchemaOrThrow(serDeProperties, e); } } @@ -94,4 +102,29 @@ public Object deserialize(Writable writable) { public ObjectInspector getObjectInspector() { return inspector; } + + /** + * Gets the hive schema from the serDeProperties, and throws an exception if it is not provided. In the later case + * it adds the previousException as a root cause. + * @param serDeProperties The source of the hive schema + * @param previousException If we had an exception previously + * @return The hive schema parsed from the serDeProperties + * @throws SerDeException If there is no schema information in the serDeProperties + */ + private static Schema hiveSchemaOrThrow(Properties serDeProperties, Exception previousException) + throws SerDeException { + // Read the configuration parameters + String columnNames = serDeProperties.getProperty(serdeConstants.LIST_COLUMNS); + String columnTypes = serDeProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES); + String columnNameDelimiter = serDeProperties.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? + serDeProperties.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); + if (columnNames != null && columnTypes != null && columnNameDelimiter != null && + !columnNames.isEmpty() && !columnTypes.isEmpty() && !columnNameDelimiter.isEmpty()) { + Schema hiveSchema = HiveSchemaUtil.schema(columnNames, columnTypes, columnNameDelimiter); + LOG.info("Using hive schema {}", SchemaParser.toJson(hiveSchema)); + return hiveSchema; + } else { + throw new SerDeException("Please provide an existing table or a valid schema", previousException); + } + } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaConverter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaConverter.java new file mode 100644 index 000000000000..a99a924bf451 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaConverter.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +class HiveSchemaConverter { + private int id; + + private HiveSchemaConverter() { + id = 0; + } + + static Schema convert(List names, List typeInfos) { + HiveSchemaConverter converter = new HiveSchemaConverter(); + return new Schema(converter.convertInternal(names, typeInfos)); + } + + List convertInternal(List names, List typeInfos) { + List result = new ArrayList<>(names.size()); + for (int i = 0; i < names.size(); ++i) { + result.add(Types.NestedField.optional(id++, names.get(i), convert(typeInfos.get(i)))); + } + + return result; + } + + Type convert(TypeInfo typeInfo) { + switch (typeInfo.getCategory()) { + case PRIMITIVE: + switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) { + case FLOAT: + return Types.FloatType.get(); + case DOUBLE: + return Types.DoubleType.get(); + case BOOLEAN: + return Types.BooleanType.get(); + case BYTE: + case SHORT: + throw new IllegalArgumentException("Unsupported Hive type (" + + ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory() + + ") for Iceberg tables. Consider using INT/INTEGER type instead."); + case INT: + return Types.IntegerType.get(); + case LONG: + return Types.LongType.get(); + case BINARY: + return Types.BinaryType.get(); + case CHAR: + case VARCHAR: + throw new IllegalArgumentException("Unsupported Hive type (" + + ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory() + + ") for Iceberg tables. Consider using STRING type instead."); + case STRING: + return Types.StringType.get(); + case TIMESTAMP: + return Types.TimestampType.withoutZone(); + case DATE: + return Types.DateType.get(); + case DECIMAL: + DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo; + return Types.DecimalType.of(decimalTypeInfo.precision(), decimalTypeInfo.scale()); + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + default: + throw new IllegalArgumentException("Unsupported Hive type (" + + ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory() + + ") for Iceberg tables."); + } + case STRUCT: + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + List fields = + convertInternal(structTypeInfo.getAllStructFieldNames(), structTypeInfo.getAllStructFieldTypeInfos()); + return Types.StructType.of(fields); + case MAP: + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + Type keyType = convert(mapTypeInfo.getMapKeyTypeInfo()); + Type valueType = convert(mapTypeInfo.getMapValueTypeInfo()); + int keyId = id++; + int valueId = id++; + return Types.MapType.ofOptional(keyId, valueId, keyType, valueType); + case LIST: + ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo; + Type listType = convert(listTypeInfo.getListElementTypeInfo()); + return Types.ListType.ofOptional(id++, listType); + case UNION: + default: + throw new IllegalArgumentException("Unknown type " + typeInfo.getCategory()); + } + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaUtil.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaUtil.java new file mode 100644 index 000000000000..457479f535bc --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaUtil.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.iceberg.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HiveSchemaUtil { + private static final Logger LOG = LoggerFactory.getLogger(HiveSchemaUtil.class); + + private HiveSchemaUtil() { + } + + /** + * Converts the list of Hive FieldSchemas to an Iceberg schema. + *

+ * The list should contain the columns and the partition columns as well. + * @param fieldSchemas The list of the columns + * @return An equivalent Iceberg Schema + */ + public static Schema schema(List fieldSchemas) { + List names = new ArrayList<>(fieldSchemas.size()); + List typeInfos = new ArrayList<>(fieldSchemas.size()); + + for (FieldSchema col : fieldSchemas) { + names.add(col.getName()); + typeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(col.getType())); + } + + return HiveSchemaConverter.convert(names, typeInfos); + } + + /** + * Converts the Hive properties defining the columns to an Iceberg schema. + * @param columnNames The property containing the column names + * @param columnTypes The property containing the column types + * @param columnNameDelimiter The name delimiter + * @return The Iceberg schema + */ + public static Schema schema(String columnNames, String columnTypes, String columnNameDelimiter) { + // Parse the configuration parameters + List names = new ArrayList<>(); + Collections.addAll(names, columnNames.split(columnNameDelimiter)); + + return HiveSchemaConverter.convert(names, TypeInfoUtils.getTypeInfosFromTypeString(columnTypes)); + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java index 8bad7a4239ee..1c92f7841115 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -42,8 +43,10 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers.Row; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; @@ -68,6 +71,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.junit.runners.Parameterized.Parameter; import static org.junit.runners.Parameterized.Parameters; @@ -79,8 +83,8 @@ public abstract class HiveIcebergStorageHandlerBaseTest { public TemporaryFolder temp = new TemporaryFolder(); private static final Schema CUSTOMER_SCHEMA = new Schema( - required(1, "customer_id", Types.LongType.get()), - required(2, "first_name", Types.StringType.get()) + optional(1, "customer_id", Types.LongType.get()), + optional(2, "first_name", Types.StringType.get()) ); private static final List CUSTOMER_RECORDS = TestHelper.RecordsBuilder.newInstance(CUSTOMER_SCHEMA) @@ -105,6 +109,37 @@ public abstract class HiveIcebergStorageHandlerBaseTest { private static final PartitionSpec IDENTITY_SPEC = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("customer_id").build(); + private static final Schema COMPLEX_SCHEMA = new Schema( + optional(1, "id", Types.LongType.get()), + optional(2, "name", Types.StringType.get()), + optional(3, "employee_info", Types.StructType.of( + optional(7, "employer", Types.StringType.get()), + optional(8, "id", Types.LongType.get()), + optional(9, "address", Types.StringType.get()) + )), + optional(4, "places_lived", Types.ListType.ofOptional(10, Types.StructType.of( + optional(11, "street", Types.StringType.get()), + optional(12, "city", Types.StringType.get()), + optional(13, "country", Types.StringType.get()) + ))), + optional(5, "memorable_moments", Types.MapType.ofOptional(14, 15, + Types.StringType.get(), + Types.StructType.of( + optional(16, "year", Types.IntegerType.get()), + optional(17, "place", Types.StringType.get()), + optional(18, "details", Types.StringType.get()) + ))), + optional(6, "current_address", Types.StructType.of( + optional(19, "street_address", Types.StructType.of( + optional(22, "street_number", Types.IntegerType.get()), + optional(23, "street_name", Types.StringType.get()), + optional(24, "street_type", Types.StringType.get()) + )), + optional(20, "country", Types.StringType.get()), + optional(21, "postal_code", Types.StringType.get()) + )) + ); + private static final Set IGNORED_PARAMS = ImmutableSet.of("bucketing_version", StatsSetupConst.ROW_COUNT, StatsSetupConst.RAW_DATA_SIZE, StatsSetupConst.TOTAL_SIZE, StatsSetupConst.NUM_FILES, "numFilesErasureCoded"); @@ -296,23 +331,17 @@ public void testSelectDistinctFromTable() throws IOException { @Test public void testCreateDropTable() throws TException, IOException, InterruptedException { - // We need the location for HadoopTable based tests only - String location = locationForCreateTable(temp.getRoot().getPath(), "customers"); + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + shell.executeStatement("CREATE EXTERNAL TABLE customers " + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - (location != null ? "LOCATION '" + location + "' " : "") + + testTables.locationForCreateTableSQL(identifier) + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "', " + "'" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(IDENTITY_SPEC) + "', " + "'dummy'='test')"); - Properties properties = new Properties(); - properties.put(Catalogs.NAME, TableIdentifier.of("default", "customers").toString()); - if (location != null) { - properties.put(Catalogs.LOCATION, location); - } - // Check the Iceberg table data - org.apache.iceberg.Table icebergTable = Catalogs.loadTable(shell.getHiveConf(), properties); + org.apache.iceberg.Table icebergTable = loadTable(identifier); Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); Assert.assertEquals(IDENTITY_SPEC, icebergTable.spec()); @@ -349,7 +378,7 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc // Check if the table was really dropped even from the Catalog AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, "Table does not exist", () -> { - Catalogs.loadTable(shell.getHiveConf(), properties); + loadTable(identifier); } ); } else { @@ -368,7 +397,7 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc // Check if we drop an exception when trying to load the table AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, "Table does not exist", () -> { - Catalogs.loadTable(shell.getHiveConf(), properties); + loadTable(identifier); } ); @@ -385,21 +414,15 @@ public void testCreateDropTable() throws TException, IOException, InterruptedExc @Test public void testCreateTableWithoutSpec() throws TException, InterruptedException { - // We need the location for HadoopTable based tests only - String location = locationForCreateTable(temp.getRoot().getPath(), "customers"); + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + shell.executeStatement("CREATE EXTERNAL TABLE customers " + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - (location != null ? "LOCATION '" + location + "' " : "") + + testTables.locationForCreateTableSQL(identifier) + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); - Properties properties = new Properties(); - properties.put(Catalogs.NAME, TableIdentifier.of("default", "customers").toString()); - if (location != null) { - properties.put(Catalogs.LOCATION, location); - } - // Check the Iceberg table partition data - org.apache.iceberg.Table icebergTable = Catalogs.loadTable(shell.getHiveConf(), properties); + org.apache.iceberg.Table icebergTable = loadTable(identifier); Assert.assertEquals(SPEC, icebergTable.spec()); // Check the HMS table parameters @@ -421,22 +444,17 @@ public void testCreateTableWithoutSpec() throws TException, InterruptedException @Test public void testCreateTableWithUnpartitionedSpec() throws TException, InterruptedException { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + // We need the location for HadoopTable based tests only - String location = locationForCreateTable(temp.getRoot().getPath(), "customers"); shell.executeStatement("CREATE EXTERNAL TABLE customers " + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - (location != null ? "LOCATION '" + location + "' " : "") + + testTables.locationForCreateTableSQL(identifier) + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "', " + "'" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(SPEC) + "')"); - Properties properties = new Properties(); - properties.put(Catalogs.NAME, TableIdentifier.of("default", "customers").toString()); - if (location != null) { - properties.put(Catalogs.LOCATION, location); - } - // Check the Iceberg table partition data - org.apache.iceberg.Table icebergTable = Catalogs.loadTable(shell.getHiveConf(), properties); + org.apache.iceberg.Table icebergTable = loadTable(identifier); Assert.assertEquals(SPEC, icebergTable.spec()); // Check the HMS table parameters @@ -457,25 +475,19 @@ public void testCreateTableWithUnpartitionedSpec() throws TException, Interrupte @Test public void testDeleteBackingTable() throws TException, IOException, InterruptedException { - // We need the location for HadoopTable based tests only - String location = locationForCreateTable(temp.getRoot().getPath(), "customers"); + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + shell.executeStatement("CREATE EXTERNAL TABLE customers " + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - (location != null ? "LOCATION '" + location + "' " : "") + + testTables.locationForCreateTableSQL(identifier) + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "', " + "'" + InputFormatConfig.EXTERNAL_TABLE_PURGE + "'='FALSE')"); - Properties properties = new Properties(); - properties.put(Catalogs.NAME, TableIdentifier.of("default", "customers").toString()); - if (location != null) { - properties.put(Catalogs.LOCATION, location); - } - if (!Catalogs.hiveCatalog(shell.getHiveConf())) { shell.executeStatement("DROP TABLE customers"); // Check if the table remains - Catalogs.loadTable(shell.getHiveConf(), properties); + loadTable(identifier); } else { // Check the HMS table parameters org.apache.hadoop.hive.metastore.api.Table hmsTable = @@ -488,7 +500,7 @@ public void testDeleteBackingTable() throws TException, IOException, Interrupted // Check if we drop an exception when trying to drop the table AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, "Table does not exist", () -> { - Catalogs.loadTable(shell.getHiveConf(), properties); + loadTable(identifier); } ); @@ -501,29 +513,29 @@ public void testDeleteBackingTable() throws TException, IOException, Interrupted @Test public void testCreateTableError() { - String location = locationForCreateTable(temp.getRoot().getPath(), "customers"); + TableIdentifier identifier = TableIdentifier.of("default", "withShell2"); // Wrong schema AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, "Unrecognized token 'WrongSchema'", () -> { shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - (location != null ? "LOCATION '" + location + "' " : "") + + testTables.locationForCreateTableSQL(identifier) + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='WrongSchema')"); } ); // Missing schema, we try to get the schema from the table and fail AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, - "Please provide an existing table or a valid schema", () -> { + "Please provide ", () -> { shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - (location != null ? "LOCATION '" + location + "' " : "")); + testTables.locationForCreateTableSQL(identifier)); } ); - if (location != null) { - // Missing location + if (!testTables.locationForCreateTableSQL(identifier).isEmpty()) { + // Only test this if the location is required AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, "Table location not set", () -> { shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " + @@ -538,7 +550,7 @@ public void testCreateTableError() { @Test public void testCreateTableAboveExistingTable() throws TException, IOException, InterruptedException { // Create the Iceberg table - createIcebergTable("customers", CUSTOMER_SCHEMA, Collections.emptyList()); + createIcebergTable("customers", COMPLEX_SCHEMA, Collections.emptyList()); if (Catalogs.hiveCatalog(shell.getHiveConf())) { @@ -552,18 +564,9 @@ public void testCreateTableAboveExistingTable() throws TException, IOException, } ); } else { - // We need the location for HadoopTable based tests only - String location = locationForCreateTable(temp.getRoot().getPath(), "customers"); - shell.executeStatement("CREATE EXTERNAL TABLE customers " + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + - (location != null ? "LOCATION '" + location + "'" : "")); - - Properties properties = new Properties(); - properties.put(Catalogs.NAME, TableIdentifier.of("default", "customers").toString()); - if (location != null) { - properties.put(Catalogs.LOCATION, location); - } + testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers"))); // Check the HMS table parameters org.apache.hadoop.hive.metastore.api.Table hmsTable = @@ -582,6 +585,153 @@ public void testCreateTableAboveExistingTable() throws TException, IOException, } } + @Test + public void testCreateTableWithColumnSpecification() throws IOException { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + shell.executeStatement("CREATE EXTERNAL TABLE customers (customer_id BIGINT, first_name STRING) " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier)); + + // Check the Iceberg table data + org.apache.iceberg.Table icebergTable = loadTable(identifier); + Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); + Assert.assertEquals(SPEC, icebergTable.spec()); + + appendIcebergTable(icebergTable, fileFormat, null, CUSTOMER_RECORDS); + + List descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC"); + + Assert.assertEquals(3, descRows.size()); + Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, descRows.get(0)); + Assert.assertArrayEquals(new Object[] {1L, "Bob"}, descRows.get(1)); + Assert.assertArrayEquals(new Object[] {0L, "Alice"}, descRows.get(2)); + } + + @Test + public void testCreateTableWithColumnSpecificationPartitioned() { + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "currently not supported", () -> { + shell.executeStatement("CREATE EXTERNAL TABLE customers (customer_id BIGINT) " + + "PARTITIONED BY (first_name STRING) " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers"))); + } + ); + } + + @Test + public void testCreatePartitionedTable() throws IOException { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + PartitionSpec spec = PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("first_name").build(); + + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier) + + "TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(spec) + "', " + + "'" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); + + org.apache.iceberg.Table icebergTable = loadTable(identifier); + Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); + Assert.assertEquals(spec, icebergTable.spec()); + + appendIcebergTable(icebergTable, fileFormat, Row.of("Alice"), Arrays.asList(CUSTOMER_RECORDS.get(0))); + appendIcebergTable(icebergTable, fileFormat, Row.of("Bob"), Arrays.asList(CUSTOMER_RECORDS.get(1))); + appendIcebergTable(icebergTable, fileFormat, Row.of("Trudy"), Arrays.asList(CUSTOMER_RECORDS.get(2))); + + List descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC"); + + Assert.assertEquals(3, descRows.size()); + Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, descRows.get(0)); + Assert.assertArrayEquals(new Object[] {1L, "Bob"}, descRows.get(1)); + Assert.assertArrayEquals(new Object[] {0L, "Alice"}, descRows.get(2)); + } + + @Test + public void testCreateTableWithColumnSpecificationHierarchy() { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + shell.executeStatement("CREATE EXTERNAL TABLE customers (" + + "id BIGINT, name STRING, " + + "employee_info STRUCT < employer: STRING, id: BIGINT, address: STRING >, " + + "places_lived ARRAY < STRUCT >, " + + "memorable_moments MAP < STRING, STRUCT < year: INT, place: STRING, details: STRING >>, " + + "current_address STRUCT < street_address: STRUCT " + + ", country: STRING, postal_code: STRING >) " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier)); + + // Check the Iceberg table data + org.apache.iceberg.Table icebergTable = loadTable(identifier); + Assert.assertEquals(COMPLEX_SCHEMA.asStruct(), icebergTable.schema().asStruct()); + } + + @Test + public void testCreateTableWithAllSupportedTypes() { + TableIdentifier identifier = TableIdentifier.of("default", "all_types"); + Schema allSupportedSchema = new Schema( + optional(1, "t_float", Types.FloatType.get()), + optional(2, "t_double", Types.DoubleType.get()), + optional(3, "t_boolean", Types.BooleanType.get()), + optional(4, "t_int", Types.IntegerType.get()), + optional(5, "t_bigint", Types.LongType.get()), + optional(6, "t_binary", Types.BinaryType.get()), + optional(7, "t_string", Types.StringType.get()), + optional(8, "t_timestamp", Types.TimestampType.withoutZone()), + optional(9, "t_date", Types.DateType.get()), + optional(10, "t_decimal", Types.DecimalType.of(3, 2)) + ); + + // Intentionally adding some mixed letters to test that we handle them correctly + shell.executeStatement("CREATE EXTERNAL TABLE all_types (" + + "t_Float FLOaT, t_dOuble DOUBLE, t_boolean BOOLEAN, t_int INT, t_bigint BIGINT, t_binary BINARY, " + + "t_string STRING, t_timestamp TIMESTAMP, t_date DATE, t_decimal DECIMAL(3,2)) " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier)); + + // Check the Iceberg table data + org.apache.iceberg.Table icebergTable = loadTable(identifier); + Assert.assertEquals(allSupportedSchema.asStruct(), icebergTable.schema().asStruct()); + } + + @Test + public void testCreateTableWithNotSupportedTypes() { + TableIdentifier identifier = TableIdentifier.of("default", "not_supported_types"); + // Can not create INTERVAL types from normal create table, so leave them out from this test + String[] notSupportedTypes = new String[] { "TINYINT", "SMALLINT", "VARCHAR(1)", "CHAR(1)" }; + + for (String notSupportedType : notSupportedTypes) { + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "Unsupported Hive type", () -> { + shell.executeStatement("CREATE EXTERNAL TABLE not_supported_types " + + "(not_supported " + notSupportedType + ") " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + testTables.locationForCreateTableSQL(identifier)); + } + ); + } + } + + protected void createTable(String tableName, Schema schema, List records) + throws IOException { + Table table = createIcebergTable(tableName, schema, records); + createHiveTable(tableName, table.location()); + } + + protected Table createIcebergTable(String tableName, Schema schema, List records) + throws IOException { + String identifier = testTables.identifier("default." + tableName); + TestHelper helper = new TestHelper( + shell.metastore().hiveConf(), testTables.tables(), identifier, schema, SPEC, fileFormat, temp); + Table table = helper.createTable(); + + if (!records.isEmpty()) { + helper.appendToTable(helper.writeFile(null, records)); + } + + return table; + } + @Test public void testArrayOfPrimitivesInTable() throws IOException { Schema schema = @@ -838,24 +988,15 @@ public void testStructOfStructsInTable() throws IOException { } } - protected void createTable(String tableName, Schema schema, List records) - throws IOException { - Table table = createIcebergTable(tableName, schema, records); - createHiveTable(tableName, table.location()); - } - - protected Table createIcebergTable(String tableName, Schema schema, List records) - throws IOException { - String identifier = testTables.identifier("default." + tableName); + protected void appendIcebergTable(Table table, FileFormat format, StructLike partition, List records) + throws IOException { TestHelper helper = new TestHelper( - shell.metastore().hiveConf(), testTables.tables(), identifier, schema, SPEC, fileFormat, temp); - Table table = helper.createTable(); + shell.getHiveConf(), null, null, null, null, format, temp); + helper.setTable(table); if (!records.isEmpty()) { - helper.appendToTable(helper.writeFile(null, records)); + helper.appendToTable(helper.writeFile(partition, records)); } - - return table; } protected void createHiveTable(String tableName, String location) { @@ -866,7 +1007,23 @@ protected void createHiveTable(String tableName, String location) { tableName, HiveIcebergStorageHandler.class.getName(), location)); } - protected String locationForCreateTable(String tempDirName, String tableName) { + private Table loadTable(TableIdentifier identifier) { + Properties properties = new Properties(); + properties.put(Catalogs.NAME, identifier.toString()); + String expectedLocation = testTables.loadLocation(identifier); + if (expectedLocation != null) { + properties.put(Catalogs.LOCATION, expectedLocation); + } + + // Check the Iceberg table data + return Catalogs.loadTable(shell.getHiveConf(), properties); + } + + protected String locationForCreateTableSQL(TemporaryFolder root, String tableName) { + return "LOCATION '" + root.getRoot().getPath() + "/hadoop/warehouse/default/" + tableName + "' "; + } + + protected String loadLocation(TemporaryFolder root, String tableName) { return null; } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java index 8ca2a71ebe49..9781dbefbe61 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java @@ -28,9 +28,4 @@ public class TestHiveIcebergStorageHandlerWithHadoopTables extends HiveIcebergSt public TestTables testTables(Configuration conf, TemporaryFolder temp) { return new TestTables.HadoopTestTables(conf, temp); } - - @Override - protected String locationForCreateTable(String tempDirName, String tableName) { - return tempDirName + "/default/" + tableName; - } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveSchemaUtil.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveSchemaUtil.java new file mode 100644 index 000000000000..2372fccb7c75 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveSchemaUtil.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public class TestHiveSchemaUtil { + private static final Schema SIMPLE_SCHEMA = new Schema( + optional(0, "customer_id", Types.LongType.get()), + optional(1, "first_name", Types.StringType.get()) + ); + + private static final Schema COMPLEX_SCHEMA = new Schema( + optional(0, "id", Types.LongType.get()), + optional(1, "name", Types.StringType.get()), + optional(2, "employee_info", Types.StructType.of( + optional(3, "employer", Types.StringType.get()), + optional(4, "id", Types.LongType.get()), + optional(5, "address", Types.StringType.get()) + )), + optional(6, "places_lived", Types.ListType.ofOptional(10, Types.StructType.of( + optional(7, "street", Types.StringType.get()), + optional(8, "city", Types.StringType.get()), + optional(9, "country", Types.StringType.get()) + ))), + optional(11, "memorable_moments", Types.MapType.ofOptional(15, 16, + Types.StringType.get(), + Types.StructType.of( + optional(12, "year", Types.IntegerType.get()), + optional(13, "place", Types.StringType.get()), + optional(14, "details", Types.StringType.get()) + ))), + optional(17, "current_address", Types.StructType.of( + optional(18, "street_address", Types.StructType.of( + optional(19, "street_number", Types.IntegerType.get()), + optional(20, "street_name", Types.StringType.get()), + optional(21, "street_type", Types.StringType.get()) + )), + optional(22, "country", Types.StringType.get()), + optional(23, "postal_code", Types.StringType.get()) + )) + ); + + @Test + public void testSimpleSchemaConvert() { + Schema schema = HiveSchemaUtil.schema("customer_id,first_name", "bigint:string", ","); + Assert.assertEquals(SIMPLE_SCHEMA.asStruct(), schema.asStruct()); + } + + @Test + public void testSimpleSchemaConvertFromType() { + List fields = new ArrayList<>(); + fields.add(new FieldSchema("customer_id", serdeConstants.BIGINT_TYPE_NAME, "")); + fields.add(new FieldSchema("first_name", serdeConstants.STRING_TYPE_NAME, "")); + Schema schema = HiveSchemaUtil.schema(fields); + Assert.assertEquals(SIMPLE_SCHEMA.asStruct(), schema.asStruct()); + } + + @Test + public void testComplexSchemaConvert() { + Schema schema = HiveSchemaUtil.schema( + "id,name,employee_info,places_lived,memorable_moments,current_address", + "bigint:string:" + + "struct:" + + "array>:" + + "map>:" + + "struct," + + "country:string,postal_code:string>", + ","); + Assert.assertEquals(COMPLEX_SCHEMA.asStruct(), schema.asStruct()); + } + + @Test + public void testSchemaConvertForEveryPrimitiveType() { + Schema schemaWithEveryType = HiveSchemaUtil.schema(getSupportedFieldSchemas()); + Assert.assertEquals(getSchemaWithSupportedTypes().asStruct(), schemaWithEveryType.asStruct()); + } + + @Test + public void testNotSupportedTypes() { + for (FieldSchema notSupportedField : getNotSupportedFieldSchemas()) { + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "Unsupported Hive type", () -> { + HiveSchemaUtil.schema(new ArrayList<>(Arrays.asList(notSupportedField))); + } + ); + } + } + + protected List getSupportedFieldSchemas() { + List fields = new ArrayList<>(); + fields.add(new FieldSchema("c_float", serdeConstants.FLOAT_TYPE_NAME, "")); + fields.add(new FieldSchema("c_double", serdeConstants.DOUBLE_TYPE_NAME, "")); + fields.add(new FieldSchema("c_boolean", serdeConstants.BOOLEAN_TYPE_NAME, "")); + fields.add(new FieldSchema("c_int", serdeConstants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema("c_long", serdeConstants.BIGINT_TYPE_NAME, "")); + fields.add(new FieldSchema("c_binary", serdeConstants.BINARY_TYPE_NAME, "")); + fields.add(new FieldSchema("c_string", serdeConstants.STRING_TYPE_NAME, "")); + fields.add(new FieldSchema("c_timestamp", serdeConstants.TIMESTAMP_TYPE_NAME, "")); + fields.add(new FieldSchema("c_date", serdeConstants.DATE_TYPE_NAME, "")); + fields.add(new FieldSchema("c_decimal", serdeConstants.DECIMAL_TYPE_NAME + "(38,10)", "")); + return fields; + } + + protected List getNotSupportedFieldSchemas() { + List fields = new ArrayList<>(); + fields.add(new FieldSchema("c_byte", serdeConstants.TINYINT_TYPE_NAME, "")); + fields.add(new FieldSchema("c_short", serdeConstants.SMALLINT_TYPE_NAME, "")); + fields.add(new FieldSchema("c_char", serdeConstants.CHAR_TYPE_NAME + "(5)", "")); + fields.add(new FieldSchema("c_varchar", serdeConstants.VARCHAR_TYPE_NAME + "(5)", "")); + fields.add(new FieldSchema("c_interval_date", serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME, "")); + fields.add(new FieldSchema("c_interval_time", serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME, "")); + return fields; + } + + protected Schema getSchemaWithSupportedTypes() { + return new Schema( + optional(0, "c_float", Types.FloatType.get()), + optional(1, "c_double", Types.DoubleType.get()), + optional(2, "c_boolean", Types.BooleanType.get()), + optional(3, "c_int", Types.IntegerType.get()), + optional(4, "c_long", Types.LongType.get()), + optional(5, "c_binary", Types.BinaryType.get()), + optional(6, "c_string", Types.StringType.get()), + optional(7, "c_timestamp", Types.TimestampType.withoutZone()), + optional(8, "c_date", Types.DateType.get()), + optional(9, "c_decimal", Types.DecimalType.of(38, 10))); + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java index db7642affa91..5fcab6d36b95 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java @@ -24,6 +24,7 @@ import java.io.UncheckedIOException; import java.util.Collections; import java.util.Map; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -35,8 +36,11 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hive.HiveCatalogs; +import org.apache.iceberg.hive.MetastoreUtil; +import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.TestCatalogs; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays; import org.junit.Assert; @@ -70,6 +74,21 @@ public Tables tables() { return tables; } + /** + * The location string needed to be provided for CREATE TABLE ... commands, + * like "LOCATION 'file:///tmp/warehouse/default/tablename'. Empty ("") if LOCATION is not needed. + * @param identifier The table identifier + * @return The location string for create table operation + */ + public abstract String locationForCreateTableSQL(TableIdentifier identifier); + + /** + * If the {@link Catalogs#LOCATION} is needed for {@link Catalogs#loadTable(Configuration, Properties)} then this + * method should provide the location string. It should return null if the location is not needed. + * @param identifier The table identifier + * @return The location string for loadTable operation + */ + public abstract String loadLocation(TableIdentifier identifier); private static class CatalogToTables implements Tables { @@ -101,7 +120,8 @@ static class CustomCatalogTestTables extends TestTables { private final String warehouseLocation; CustomCatalogTestTables(Configuration conf, TemporaryFolder temp) throws IOException { - this(conf, temp, temp.newFolder("custom", "warehouse").toString()); + this(conf, temp, (MetastoreUtil.hive3PresentOnClasspath() ? "file:" : "") + + temp.newFolder("custom", "warehouse").toString()); } CustomCatalogTestTables(Configuration conf, TemporaryFolder temp, String warehouseLocation) { @@ -116,6 +136,16 @@ public Map properties() { TestCatalogs.CustomHadoopCatalog.WAREHOUSE_LOCATION, warehouseLocation ); } + + @Override + public String locationForCreateTableSQL(TableIdentifier identifier) { + return "LOCATION '" + warehouseLocation + TestTables.tablePath(identifier) + "' "; + } + + @Override + public String loadLocation(TableIdentifier identifier) { + return warehouseLocation + TestTables.tablePath(identifier); + } } static class HadoopCatalogTestTables extends TestTables { @@ -123,7 +153,8 @@ static class HadoopCatalogTestTables extends TestTables { private final String warehouseLocation; HadoopCatalogTestTables(Configuration conf, TemporaryFolder temp) throws IOException { - this(conf, temp, temp.newFolder("hadoop", "warehouse").toString()); + this(conf, temp, (MetastoreUtil.hive3PresentOnClasspath() ? "file:" : "") + + temp.newFolder("hadoop", "warehouse").toString()); } HadoopCatalogTestTables(Configuration conf, TemporaryFolder temp, String warehouseLocation) { @@ -138,6 +169,14 @@ public Map properties() { InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, warehouseLocation ); } + + public String locationForCreateTableSQL(TableIdentifier identifier) { + return "LOCATION '" + warehouseLocation + TestTables.tablePath(identifier) + "' "; + } + + public String loadLocation(TableIdentifier identifier) { + return null; + } } static class HadoopTestTables extends TestTables { @@ -160,6 +199,16 @@ public String identifier(String tableIdentifier) { Assert.assertTrue(location.delete()); return location.toString(); } + + @Override + public String locationForCreateTableSQL(TableIdentifier identifier) { + return "LOCATION '" + temp.getRoot().getPath() + tablePath(identifier) + "' "; + } + + @Override + public String loadLocation(TableIdentifier identifier) { + return temp.getRoot().getPath() + TestTables.tablePath(identifier); + } } static class HiveTestTables extends TestTables { @@ -172,5 +221,18 @@ static class HiveTestTables extends TestTables { public Map properties() { return ImmutableMap.of(InputFormatConfig.CATALOG, "hive"); } + + @Override + public String locationForCreateTableSQL(TableIdentifier identifier) { + return ""; + } + + public String loadLocation(TableIdentifier identifier) { + return null; + } + } + + private static String tablePath(TableIdentifier identifier) { + return "/" + Joiner.on("/").join(identifier.namespace().levels()) + "/" + identifier.name(); } }