diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index ed3c96eca55c..badc911c1f7e 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -146,40 +146,28 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf); boolean threw = true; + boolean updateHiveTable = false; Optional lockId = Optional.empty(); try { lockId = Optional.of(acquireLock()); // TODO add lock heart beating for cases where default lock timeout is too low. - Table tbl; - if (base != null) { + + Table tbl = loadHmsTable(); + + if (tbl != null) { + // If we try to create the table but the metadata location is already set, then we had a concurrent commit + if (base == null && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) != null) { + throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); + } + + updateHiveTable = true; LOG.debug("Committing existing table: {}", fullName); - tbl = metaClients.run(client -> client.getTable(database, tableName)); - tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes } else { + tbl = newHmsTable(); LOG.debug("Committing new table: {}", fullName); - final long currentTimeMillis = System.currentTimeMillis(); - tbl = new Table(tableName, - database, - System.getProperty("user.name"), - (int) currentTimeMillis / 1000, - (int) currentTimeMillis / 1000, - Integer.MAX_VALUE, - storageDescriptor(metadata, hiveEngineEnabled), - Collections.emptyList(), - new HashMap<>(), - null, - null, - TableType.EXTERNAL_TABLE.toString()); - tbl.getParameters().put("EXTERNAL", "TRUE"); // using the external table type also requires this } - // If needed set the 'storate_handler' property to enable query from Hive - if (hiveEngineEnabled) { - tbl.getParameters().put(hive_metastoreConstants.META_TABLE_STORAGE, - "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"); - } else { - tbl.getParameters().remove(hive_metastoreConstants.META_TABLE_STORAGE); - } + tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP); String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; @@ -189,22 +177,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { baseMetadataLocation, metadataLocation, database, tableName); } - setParameters(newMetadataLocation, tbl); + setParameters(newMetadataLocation, tbl, hiveEngineEnabled); - if (base != null) { - metaClients.run(client -> { - EnvironmentContext envContext = new EnvironmentContext( - ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE) - ); - ALTER_TABLE.invoke(client, database, tableName, tbl, envContext); - return null; - }); - } else { - metaClients.run(client -> { - client.createTable(tbl); - return null; - }); - } + persistTable(tbl, updateHiveTable); threw = false; } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); @@ -231,7 +206,53 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } } - private void setParameters(String newMetadataLocation, Table tbl) { + private void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException { + if (updateHiveTable) { + metaClients.run(client -> { + EnvironmentContext envContext = new EnvironmentContext( + ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE) + ); + ALTER_TABLE.invoke(client, database, tableName, hmsTable, envContext); + return null; + }); + } else { + metaClients.run(client -> { + client.createTable(hmsTable); + return null; + }); + } + } + + private Table loadHmsTable() throws TException, InterruptedException { + try { + return metaClients.run(client -> client.getTable(database, tableName)); + } catch (NoSuchObjectException nte) { + LOG.trace("Table not found {}", fullName, nte); + return null; + } + } + + private Table newHmsTable() { + final long currentTimeMillis = System.currentTimeMillis(); + + Table newTable = new Table(tableName, + database, + System.getProperty("user.name"), + (int) currentTimeMillis / 1000, + (int) currentTimeMillis / 1000, + Integer.MAX_VALUE, + null, + Collections.emptyList(), + new HashMap<>(), + null, + null, + TableType.EXTERNAL_TABLE.toString()); + + newTable.getParameters().put("EXTERNAL", "TRUE"); // using the external table type also requires this + return newTable; + } + + private void setParameters(String newMetadataLocation, Table tbl, boolean hiveEngineEnabled) { Map parameters = tbl.getParameters(); if (parameters == null) { @@ -245,6 +266,14 @@ private void setParameters(String newMetadataLocation, Table tbl) { parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation()); } + // If needed set the 'storage_handler' property to enable query from Hive + if (hiveEngineEnabled) { + parameters.put(hive_metastoreConstants.META_TABLE_STORAGE, + "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"); + } else { + parameters.remove(hive_metastoreConstants.META_TABLE_STORAGE); + } + tbl.setParameters(parameters); } @@ -334,8 +363,6 @@ static void validateTableIsIceberg(Table table, String fullName) { String tableType = table.getParameters().get(TABLE_TYPE_PROP); NoSuchIcebergTableException.check(tableType != null && tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE), "Not an iceberg table: %s (type=%s)", fullName, tableType); - NoSuchIcebergTableException.check(table.getParameters().get(METADATA_LOCATION_PROP) != null, - "Not an iceberg table: %s missing %s", fullName, METADATA_LOCATION_PROP); } /** diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java index 1a7b0067e77b..957cc712a153 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java @@ -30,6 +30,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.IHMSHandler; @@ -37,6 +40,7 @@ import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.hadoop.Util; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; @@ -49,6 +53,8 @@ public class TestHiveMetastore { + private static final String DEFAULT_DATABASE_NAME = "default"; + // create the metastore handlers based on whether we're working with Hive2 or Hive3 dependencies // we need to do this because there is a breaking API change between Hive2 and Hive3 private static final DynConstructors.Ctor HMS_HANDLER_CTOR = DynConstructors.builder() @@ -66,26 +72,35 @@ public class TestHiveMetastore { private ExecutorService executorService; private TServer server; private HiveMetaStore.HMSHandler baseHandler; + private HiveClientPool clientPool; public void start() { try { - hiveLocalDir = createTempDirectory("hive", asFileAttribute(fromString("rwxrwxrwx"))).toFile(); + this.hiveLocalDir = createTempDirectory("hive", asFileAttribute(fromString("rwxrwxrwx"))).toFile(); File derbyLogFile = new File(hiveLocalDir, "derby.log"); System.setProperty("derby.stream.error.file", derbyLogFile.getAbsolutePath()); setupMetastoreDB("jdbc:derby:" + getDerbyPath() + ";create=true"); TServerSocket socket = new TServerSocket(0); int port = socket.getServerSocket().getLocalPort(); - hiveConf = newHiveConf(port); - server = newThriftServer(socket, hiveConf); - executorService = Executors.newSingleThreadExecutor(); - executorService.submit(() -> server.serve()); + this.hiveConf = newHiveConf(port); + this.server = newThriftServer(socket, hiveConf); + this.executorService = Executors.newSingleThreadExecutor(); + this.executorService.submit(() -> server.serve()); + + // in Hive3, setting this as a system prop ensures that it will be picked up whenever a new HiveConf is created + System.setProperty(HiveConf.ConfVars.METASTOREURIS.varname, hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)); + + this.clientPool = new HiveClientPool(1, hiveConf); } catch (Exception e) { throw new RuntimeException("Cannot start TestHiveMetastore", e); } } public void stop() { + if (clientPool != null) { + clientPool.close(); + } if (server != null) { server.stop(); } @@ -104,11 +119,43 @@ public HiveConf hiveConf() { return hiveConf; } + public HiveClientPool clientPool() { + return clientPool; + } + public String getDatabasePath(String dbName) { File dbDir = new File(hiveLocalDir, dbName + ".db"); return dbDir.getPath(); } + public void reset() throws Exception { + for (String dbName : clientPool.run(client -> client.getAllDatabases())) { + for (String tblName : clientPool.run(client -> client.getAllTables(dbName))) { + clientPool.run(client -> { + client.dropTable(dbName, tblName, true, true, true); + return null; + }); + } + + if (!DEFAULT_DATABASE_NAME.equals(dbName)) { + // Drop cascade, functions dropped by cascade + clientPool.run(client -> { + client.dropDatabase(dbName, true, true, true); + return null; + }); + } + } + + Path warehouseRoot = new Path(hiveLocalDir.getAbsolutePath()); + FileSystem fs = Util.getFs(warehouseRoot, hiveConf); + for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) { + if (!fileStatus.getPath().getName().equals("derby.log") && + !fileStatus.getPath().getName().equals("metastore_db")) { + fs.delete(fileStatus.getPath(), true); + } + } + } + private TServer newThriftServer(TServerSocket socket, HiveConf conf) throws Exception { HiveConf serverConf = new HiveConf(conf); serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:" + getDerbyPath() + ";create=true"); diff --git a/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java b/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java index aba158add705..3b9894a8b0d0 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java +++ b/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java @@ -59,8 +59,8 @@ public final class Catalogs { private static final String HADOOP = "hadoop"; private static final String HIVE = "hive"; - private static final String NAME = "name"; - private static final String LOCATION = "location"; + public static final String NAME = "name"; + public static final String LOCATION = "location"; private static final Set PROPERTIES_TO_REMOVE = ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, LOCATION, NAME); @@ -177,6 +177,15 @@ public static boolean dropTable(Configuration conf, Properties props) { return new HadoopTables(conf).dropTable(location); } + /** + * Returns true if HiveCatalog is used + * @param conf a Hadoop conf + * @return true if the Catalog is HiveCatalog + */ + public static boolean hiveCatalog(Configuration conf) { + return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG)); + } + @VisibleForTesting static Optional loadCatalog(Configuration conf) { String catalogLoaderClass = conf.get(InputFormatConfig.CATALOG_LOADER_CLASS); diff --git a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java index 1f2502559356..13edfaad825e 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java +++ b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java @@ -48,6 +48,7 @@ private InputFormatConfig() { public static final String CATALOG = "iceberg.mr.catalog"; public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = "iceberg.mr.catalog.hadoop.warehouse.location"; public static final String CATALOG_LOADER_CLASS = "iceberg.mr.catalog.loader.class"; + public static final String EXTERNAL_TABLE_PURGE = "external.table.purge"; public static final String CATALOG_NAME = "iceberg.catalog"; public static final String HADOOP_CATALOG = "hadoop.catalog"; diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java new file mode 100644 index 000000000000..351994c93000 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.Properties; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.mr.Catalogs; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HiveIcebergMetaHook implements HiveMetaHook { + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergMetaHook.class); + private static final Set PARAMETERS_TO_REMOVE = ImmutableSet + .of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, Catalogs.LOCATION, Catalogs.NAME); + private static final Set PROPERTIES_TO_REMOVE = ImmutableSet + .of(InputFormatConfig.EXTERNAL_TABLE_PURGE, hive_metastoreConstants.META_TABLE_STORAGE, "EXTERNAL", + "bucketing_version"); + + private final Configuration conf; + private Table icebergTable = null; + private Properties catalogProperties; + private boolean deleteIcebergTable; + private FileIO deleteIo; + private TableMetadata deleteMetadata; + + public HiveIcebergMetaHook(Configuration conf) { + this.conf = conf; + } + + @Override + public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { + this.catalogProperties = getCatalogProperties(hmsTable); + + // Set the table type even for non HiveCatalog based tables + hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, + BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase()); + + if (!Catalogs.hiveCatalog(conf)) { + // If not using HiveCatalog check for existing table + try { + this.icebergTable = Catalogs.loadTable(conf, catalogProperties); + + Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA) == null, + "Iceberg table already created - can not use provided schema"); + Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC) == null, + "Iceberg table already created - can not use provided partition specification"); + + LOG.info("Iceberg table already exists {}", icebergTable); + + return; + } catch (NoSuchTableException nte) { + // If the table does not exist we will create it below + } + } + + // If the table does not exist collect data for table creation + String schemaString = catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA); + Preconditions.checkNotNull(schemaString, "Please provide a table schema"); + // Just check if it is parsable, and later use for partition specification parsing + Schema schema = SchemaParser.fromJson(schemaString); + + String specString = catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC); + if (specString != null) { + // Just check if it is parsable + PartitionSpecParser.fromJson(schema, specString); + } + + // Allow purging table data if the table is created now and not set otherwise + if (hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE) == null) { + hmsTable.getParameters().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "TRUE"); + } + + // Remove creation related properties + PARAMETERS_TO_REMOVE.forEach(hmsTable.getParameters()::remove); + } + + @Override + public void rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { + // do nothing + } + + @Override + public void commitCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { + if (icebergTable == null) { + if (Catalogs.hiveCatalog(conf)) { + catalogProperties.put(TableProperties.ENGINE_HIVE_ENABLED, true); + } + + Catalogs.createTable(conf, catalogProperties); + } + } + + @Override + public void preDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { + this.catalogProperties = getCatalogProperties(hmsTable); + this.deleteIcebergTable = hmsTable.getParameters() != null && + "TRUE".equalsIgnoreCase(hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE)); + + if (deleteIcebergTable && Catalogs.hiveCatalog(conf)) { + // Store the metadata and the id for deleting the actual table data + String metadataLocation = hmsTable.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + this.deleteIo = Catalogs.loadTable(conf, catalogProperties).io(); + this.deleteMetadata = TableMetadataParser.read(deleteIo, metadataLocation); + } + } + + @Override + public void rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { + // do nothing + } + + @Override + public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, boolean deleteData) { + if (deleteData && deleteIcebergTable) { + if (!Catalogs.hiveCatalog(conf)) { + LOG.info("Dropping with purge all the data for table {}.{}", hmsTable.getDbName(), hmsTable.getTableName()); + Catalogs.dropTable(conf, catalogProperties); + } else { + CatalogUtil.dropTableData(deleteIo, deleteMetadata); + } + } + } + + /** + * Calculates the properties we would like to send to the catalog. + *
    + *
  • The base of the properties is the properties store at the Hive Metastore for the given table + *
  • We add the {@link Catalogs#LOCATION} as the table location + *
  • We add the {@link Catalogs#NAME} as TableIdentifier defined by the database name and table name + *
  • We remove the Hive Metastore specific parameters + *
+ * @param hmsTable Table for which we are calculating the properties + * @return The properties we can provide for Iceberg functions, like {@link Catalogs} + */ + private Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) { + Properties properties = new Properties(); + properties.putAll(hmsTable.getParameters()); + + if (properties.get(Catalogs.LOCATION) == null && + hmsTable.getSd() != null && hmsTable.getSd().getLocation() != null) { + properties.put(Catalogs.LOCATION, hmsTable.getSd().getLocation()); + } + + if (properties.get(Catalogs.NAME) == null) { + properties.put(Catalogs.NAME, TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()).toString()); + } + + // Remove creation related properties + PROPERTIES_TO_REMOVE.forEach(properties::remove); + + return properties; + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java index b46847ecf002..8c8f91ec15dc 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java @@ -29,25 +29,40 @@ import org.apache.hadoop.io.Writable; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector; import org.apache.iceberg.mr.mapred.Container; public class HiveIcebergSerDe extends AbstractSerDe { - private ObjectInspector inspector; @Override public void initialize(@Nullable Configuration configuration, Properties serDeProperties) throws SerDeException { + // HiveIcebergSerDe.initialize is called multiple places in Hive code: + // - When we are trying to create a table - HiveDDL data is stored at the serDeProperties, but no Iceberg table + // is created yet. + // - When we are compiling the Hive query on HiveServer2 side - We only have table information (location/name), + // and we have to read the schema using the table data. This is called multiple times so there is room for + // optimizing here. + // - When we are executing the Hive query in the execution engine - We do not want to load the table data on every + // executor, but serDeProperties are populated by HiveIcebergStorageHandler.configureInputJobProperties() and + // the resulting properties are serialized and distributed to the executors + Schema tableSchema; if (configuration.get(InputFormatConfig.TABLE_SCHEMA) != null) { tableSchema = SchemaParser.fromJson(configuration.get(InputFormatConfig.TABLE_SCHEMA)); + } else if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) { + tableSchema = SchemaParser.fromJson((String) serDeProperties.get(InputFormatConfig.TABLE_SCHEMA)); } else { - Table table = Catalogs.loadTable(configuration, serDeProperties); - tableSchema = table.schema(); + try { + tableSchema = Catalogs.loadTable(configuration, serDeProperties).schema(); + } catch (NoSuchTableException nte) { + throw new SerDeException("Please provide an existing table or a valid schema", nte); + } } + try { this.inspector = IcebergObjectInspector.create(tableSchema); } catch (Exception e) { diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 3264137eb7a3..fe759ac61e1a 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -42,8 +42,6 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler { - private static final String NAME = "name"; - private Configuration conf; @Override @@ -63,7 +61,7 @@ public Class getSerDeClass() { @Override public HiveMetaHook getMetaHook() { - return null; + return new HiveIcebergMetaHook(conf); } @Override @@ -76,7 +74,7 @@ public void configureInputJobProperties(TableDesc tableDesc, Map Properties props = tableDesc.getProperties(); Table table = Catalogs.loadTable(conf, props); - map.put(InputFormatConfig.TABLE_IDENTIFIER, props.getProperty(NAME)); + map.put(InputFormatConfig.TABLE_IDENTIFIER, props.getProperty(Catalogs.NAME)); map.put(InputFormatConfig.TABLE_LOCATION, table.location()); map.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(table.schema())); } @@ -102,7 +100,7 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { Properties props = tableDesc.getProperties(); Table table = Catalogs.loadTable(conf, props); - jobConf.set(InputFormatConfig.TABLE_IDENTIFIER, props.getProperty(NAME)); + jobConf.set(InputFormatConfig.TABLE_IDENTIFIER, props.getProperty(Catalogs.NAME)); jobConf.set(InputFormatConfig.TABLE_LOCATION, table.location()); jobConf.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(table.schema())); } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java index 5a09b6c088f8..ae08177d3541 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerBaseTest.java @@ -23,20 +23,39 @@ import com.klarna.hiverunner.StandaloneHiveRunner; import com.klarna.hiverunner.annotations.HiveSQL; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.hive.TestHiveMetastore; +import org.apache.iceberg.mr.Catalogs; +import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.TestHelper; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.types.Types; +import org.apache.thrift.TException; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -52,8 +71,6 @@ @RunWith(StandaloneHiveRunner.class) public abstract class HiveIcebergStorageHandlerBaseTest { - private static final String DEFAULT_DATABASE_NAME = "default"; - @HiveSQL(files = {}, autoStart = false) private HiveShell shell; @@ -84,6 +101,14 @@ public abstract class HiveIcebergStorageHandlerBaseTest { private static final PartitionSpec SPEC = PartitionSpec.unpartitioned(); + private static final PartitionSpec IDENTITY_SPEC = + PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("customer_id").build(); + + private static final Set IGNORED_PARAMS = + ImmutableSet.of("bucketing_version", StatsSetupConst.ROW_COUNT, + StatsSetupConst.RAW_DATA_SIZE, StatsSetupConst.TOTAL_SIZE, StatsSetupConst.NUM_FILES); + + // before variables protected static TestHiveMetastore metastore; private TestTables testTables; @@ -106,8 +131,6 @@ public static void afterClass() { @Before public void before() throws IOException { String metastoreUris = metastore.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS); - // in Hive3, setting this as a system prop ensures that it will be picked up whenever a new HiveConf is created - System.setProperty(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUris); testTables = testTables(metastore.hiveConf(), temp); @@ -124,16 +147,7 @@ public void before() throws IOException { @After public void after() throws Exception { - Hive db = Hive.get(metastore.hiveConf()); - for (String dbName : db.getAllDatabases()) { - for (String tblName : db.getAllTables(dbName)) { - db.dropTable(dbName, tblName); - } - if (!DEFAULT_DATABASE_NAME.equals(dbName)) { - // Drop cascade, functions dropped by cascade - db.dropDatabase(dbName, true, true, true); - } - } + metastore.reset(); } // PARQUET @@ -230,6 +244,290 @@ public void testJoinTables(FileFormat format) throws IOException { Assert.assertArrayEquals(new Object[] {1L, "Bob", 102L, 33.33d}, rows.get(2)); } + @Test + public void testCreateDropTable() throws TException, IOException, InterruptedException { + // We need the location for HadoopTable based tests only + String location = locationForCreateTable(temp.getRoot().getPath(), "customers"); + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + (location != null ? "LOCATION '" + location + "' " : "") + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "', " + + "'" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(IDENTITY_SPEC) + "', " + + "'dummy'='test')"); + + Properties properties = new Properties(); + properties.put(Catalogs.NAME, TableIdentifier.of("default", "customers").toString()); + if (location != null) { + properties.put(Catalogs.LOCATION, location); + } + + // Check the Iceberg table data + org.apache.iceberg.Table icebergTable = Catalogs.loadTable(shell.getHiveConf(), properties); + Assert.assertEquals(CUSTOMER_SCHEMA.asStruct(), icebergTable.schema().asStruct()); + Assert.assertEquals(IDENTITY_SPEC, icebergTable.spec()); + + // Check the HMS table parameters + org.apache.hadoop.hive.metastore.api.Table hmsTable = + metastore.clientPool().run(client -> client.getTable("default", "customers")); + + Map hmsParams = hmsTable.getParameters(); + IGNORED_PARAMS.forEach(hmsParams::remove); + + // This is only set for HiveCatalog based tables. Check the value, then remove it so the other checks can be general + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + Assert.assertTrue(hmsParams.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) + .startsWith(icebergTable.location())); + hmsParams.remove(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + } + + // General metadata checks + Assert.assertEquals(6, hmsParams.size()); + Assert.assertEquals("test", hmsParams.get("dummy")); + Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE)); + Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); + Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME)); + Assert.assertEquals(HiveIcebergStorageHandler.class.getName(), + hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE)); + Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(), + hmsTable.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); + + if (!Catalogs.hiveCatalog(shell.getHiveConf())) { + Assert.assertEquals(Collections.singletonMap("dummy", "test"), icebergTable.properties()); + + shell.executeStatement("DROP TABLE customers"); + + // Check if the table was really dropped even from the Catalog + AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, + "Table does not exist", () -> { + Catalogs.loadTable(shell.getHiveConf(), properties); + } + ); + } else { + Map expectedIcebergProperties = new HashMap<>(2); + expectedIcebergProperties.put("dummy", "test"); + expectedIcebergProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true"); + Assert.assertEquals(expectedIcebergProperties, icebergTable.properties()); + + // Check the HMS table parameters + hmsTable = metastore.clientPool().run(client -> client.getTable("default", "customers")); + Path hmsTableLocation = new Path(hmsTable.getSd().getLocation()); + + // Drop the table + shell.executeStatement("DROP TABLE customers"); + + // Check if we drop an exception when trying to load the table + AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, + "Table does not exist", () -> { + Catalogs.loadTable(shell.getHiveConf(), properties); + } + ); + + // Check if the files are removed + FileSystem fs = Util.getFs(hmsTableLocation, shell.getHiveConf()); + Assert.assertEquals(1, fs.listStatus(hmsTableLocation).length); + Assert.assertEquals(0, fs.listStatus(new Path(hmsTableLocation, "metadata")).length); + } + } + + @Test + public void testCreateTableWithoutSpec() throws TException, InterruptedException { + // We need the location for HadoopTable based tests only + String location = locationForCreateTable(temp.getRoot().getPath(), "customers"); + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + (location != null ? "LOCATION '" + location + "' " : "") + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); + + Properties properties = new Properties(); + properties.put(Catalogs.NAME, TableIdentifier.of("default", "customers").toString()); + if (location != null) { + properties.put(Catalogs.LOCATION, location); + } + + // Check the Iceberg table partition data + org.apache.iceberg.Table icebergTable = Catalogs.loadTable(shell.getHiveConf(), properties); + Assert.assertEquals(SPEC, icebergTable.spec()); + + // Check the HMS table parameters + org.apache.hadoop.hive.metastore.api.Table hmsTable = + metastore.clientPool().run(client -> client.getTable("default", "customers")); + + Map hmsParams = hmsTable.getParameters(); + IGNORED_PARAMS.forEach(hmsParams::remove); + + // Just check that the PartitionSpec is not set in the metadata + Assert.assertNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC)); + + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + Assert.assertEquals(6, hmsParams.size()); + } else { + Assert.assertEquals(5, hmsParams.size()); + } + } + + @Test + public void testCreateTableWithUnpartitionedSpec() throws TException, InterruptedException { + // We need the location for HadoopTable based tests only + String location = locationForCreateTable(temp.getRoot().getPath(), "customers"); + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + (location != null ? "LOCATION '" + location + "' " : "") + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "', " + + "'" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(SPEC) + "')"); + + Properties properties = new Properties(); + properties.put(Catalogs.NAME, TableIdentifier.of("default", "customers").toString()); + if (location != null) { + properties.put(Catalogs.LOCATION, location); + } + + // Check the Iceberg table partition data + org.apache.iceberg.Table icebergTable = Catalogs.loadTable(shell.getHiveConf(), properties); + Assert.assertEquals(SPEC, icebergTable.spec()); + + // Check the HMS table parameters + org.apache.hadoop.hive.metastore.api.Table hmsTable = + metastore.clientPool().run(client -> client.getTable("default", "customers")); + + Map hmsParams = hmsTable.getParameters(); + IGNORED_PARAMS.forEach(hmsParams::remove); + + // Just check that the PartitionSpec is not set in the metadata + Assert.assertNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC)); + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + Assert.assertEquals(6, hmsParams.size()); + } else { + Assert.assertEquals(5, hmsParams.size()); + } + } + + @Test + public void testDeleteBackingTable() throws TException, IOException, InterruptedException { + // We need the location for HadoopTable based tests only + String location = locationForCreateTable(temp.getRoot().getPath(), "customers"); + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + (location != null ? "LOCATION '" + location + "' " : "") + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(CUSTOMER_SCHEMA) + "', " + + "'" + InputFormatConfig.EXTERNAL_TABLE_PURGE + "'='FALSE')"); + + Properties properties = new Properties(); + properties.put(Catalogs.NAME, TableIdentifier.of("default", "customers").toString()); + if (location != null) { + properties.put(Catalogs.LOCATION, location); + } + + if (!Catalogs.hiveCatalog(shell.getHiveConf())) { + shell.executeStatement("DROP TABLE customers"); + + // Check if the table remains + Catalogs.loadTable(shell.getHiveConf(), properties); + } else { + // Check the HMS table parameters + org.apache.hadoop.hive.metastore.api.Table hmsTable = + metastore.clientPool().run(client -> client.getTable("default", "customers")); + Path hmsTableLocation = new Path(hmsTable.getSd().getLocation()); + + // Drop the table + shell.executeStatement("DROP TABLE customers"); + + // Check if we drop an exception when trying to drop the table + AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class, + "Table does not exist", () -> { + Catalogs.loadTable(shell.getHiveConf(), properties); + } + ); + + // Check if the files are kept + FileSystem fs = Util.getFs(hmsTableLocation, shell.getHiveConf()); + Assert.assertEquals(1, fs.listStatus(hmsTableLocation).length); + Assert.assertEquals(1, fs.listStatus(new Path(hmsTableLocation, "metadata")).length); + } + } + + @Test + public void testCreateTableError() { + String location = locationForCreateTable(temp.getRoot().getPath(), "customers"); + + // Wrong schema + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "Unrecognized token 'WrongSchema'", () -> { + shell.executeQuery("CREATE EXTERNAL TABLE withShell2 " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + (location != null ? "LOCATION '" + location + "' " : "") + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='WrongSchema')"); + } + ); + + // Missing schema, we try to get the schema from the table and fail + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "Please provide an existing table or a valid schema", () -> { + shell.executeQuery("CREATE EXTERNAL TABLE withShell2 " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + (location != null ? "LOCATION '" + location + "' " : "")); + } + ); + + if (location != null) { + // Missing location + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "Table location not set", () -> { + shell.executeQuery("CREATE EXTERNAL TABLE withShell2 " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); + } + ); + } + } + + @Test + public void testCreateTableAboveExistingTable() throws TException, IOException, InterruptedException { + // Create the Iceberg table + createIcebergTable("customers", CUSTOMER_SCHEMA, FileFormat.PARQUET, Collections.emptyList()); + + if (Catalogs.hiveCatalog(shell.getHiveConf())) { + + // In HiveCatalog we just expect an exception since the table is already exists + AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class, + "customers already exists", () -> { + shell.executeQuery("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + + SchemaParser.toJson(CUSTOMER_SCHEMA) + "')"); + } + ); + } else { + // We need the location for HadoopTable based tests only + String location = locationForCreateTable(temp.getRoot().getPath(), "customers"); + + shell.executeStatement("CREATE EXTERNAL TABLE customers " + + "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " + + (location != null ? "LOCATION '" + location + "'" : "")); + + Properties properties = new Properties(); + properties.put(Catalogs.NAME, TableIdentifier.of("default", "customers").toString()); + if (location != null) { + properties.put(Catalogs.LOCATION, location); + } + + // Check the HMS table parameters + org.apache.hadoop.hive.metastore.api.Table hmsTable = + metastore.clientPool().run(client -> client.getTable("default", "customers")); + + Map hmsParams = hmsTable.getParameters(); + IGNORED_PARAMS.forEach(hmsParams::remove); + + Assert.assertEquals(4, hmsParams.size()); + Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL")); + Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME)); + Assert.assertEquals(HiveIcebergStorageHandler.class.getName(), + hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE)); + Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(), + hmsTable.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); + } + } + protected void createTable(String tableName, Schema schema, FileFormat format, List records) throws IOException { Table table = createIcebergTable(tableName, schema, format, records); @@ -257,4 +555,8 @@ protected void createHiveTable(String tableName, String location) { "LOCATION '%s'", tableName, HiveIcebergStorageHandler.class.getName(), location)); } + + protected String locationForCreateTable(String tempDirName, String tableName) { + return null; + } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java index 9781dbefbe61..8ca2a71ebe49 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHadoopTables.java @@ -28,4 +28,9 @@ public class TestHiveIcebergStorageHandlerWithHadoopTables extends HiveIcebergSt public TestTables testTables(Configuration conf, TemporaryFolder temp) { return new TestTables.HadoopTestTables(conf, temp); } + + @Override + protected String locationForCreateTable(String tempDirName, String tableName) { + return tempDirName + "/default/" + tableName; + } } diff --git a/mr/src/test/resources/log4j.properties b/mr/src/test/resources/log4j.properties new file mode 100644 index 000000000000..033098d8a5b2 --- /dev/null +++ b/mr/src/test/resources/log4j.properties @@ -0,0 +1,33 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# log4j configuration used during build and unit tests + +log4j.rootLogger=DEBUG,stdout +log4j.threshhold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n