Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,40 +146,28 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);

boolean threw = true;
boolean updateHiveTable = false;
Optional<Long> lockId = Optional.empty();
try {
lockId = Optional.of(acquireLock());
// TODO add lock heart beating for cases where default lock timeout is too low.
Table tbl;
if (base != null) {

Table tbl = loadHmsTable();

if (tbl != null) {
// If we try to create the table but the metadata location is already set, then we had a concurrent commit
if (base == null && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) != null) {
throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);
}

updateHiveTable = true;
LOG.debug("Committing existing table: {}", fullName);
tbl = metaClients.run(client -> client.getTable(database, tableName));
tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes
} else {
tbl = newHmsTable();
LOG.debug("Committing new table: {}", fullName);
final long currentTimeMillis = System.currentTimeMillis();
tbl = new Table(tableName,
database,
System.getProperty("user.name"),
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
storageDescriptor(metadata, hiveEngineEnabled),
Collections.emptyList(),
new HashMap<>(),
null,
null,
TableType.EXTERNAL_TABLE.toString());
tbl.getParameters().put("EXTERNAL", "TRUE"); // using the external table type also requires this
}

// If needed set the 'storate_handler' property to enable query from Hive
if (hiveEngineEnabled) {
tbl.getParameters().put(hive_metastoreConstants.META_TABLE_STORAGE,
"org.apache.iceberg.mr.hive.HiveIcebergStorageHandler");
} else {
tbl.getParameters().remove(hive_metastoreConstants.META_TABLE_STORAGE);
}
tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes

String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
Expand All @@ -189,22 +177,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
baseMetadataLocation, metadataLocation, database, tableName);
}

setParameters(newMetadataLocation, tbl);
setParameters(newMetadataLocation, tbl, hiveEngineEnabled);

if (base != null) {
metaClients.run(client -> {
EnvironmentContext envContext = new EnvironmentContext(
ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE)
);
ALTER_TABLE.invoke(client, database, tableName, tbl, envContext);
return null;
});
} else {
metaClients.run(client -> {
client.createTable(tbl);
return null;
});
}
persistTable(tbl, updateHiveTable);
threw = false;
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);
Expand All @@ -231,7 +206,53 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
}
}

private void setParameters(String newMetadataLocation, Table tbl) {
private void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException {
if (updateHiveTable) {
metaClients.run(client -> {
EnvironmentContext envContext = new EnvironmentContext(
ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE)
);
ALTER_TABLE.invoke(client, database, tableName, hmsTable, envContext);
return null;
});
} else {
metaClients.run(client -> {
client.createTable(hmsTable);
return null;
});
}
}

private Table loadHmsTable() throws TException, InterruptedException {
try {
return metaClients.run(client -> client.getTable(database, tableName));
} catch (NoSuchObjectException nte) {
LOG.trace("Table not found {}", fullName, nte);
return null;
}
}

private Table newHmsTable() {
final long currentTimeMillis = System.currentTimeMillis();

Table newTable = new Table(tableName,
database,
System.getProperty("user.name"),
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
null,
Collections.emptyList(),
new HashMap<>(),
null,
null,
TableType.EXTERNAL_TABLE.toString());

newTable.getParameters().put("EXTERNAL", "TRUE"); // using the external table type also requires this
return newTable;
}

private void setParameters(String newMetadataLocation, Table tbl, boolean hiveEngineEnabled) {
Map<String, String> parameters = tbl.getParameters();

if (parameters == null) {
Expand All @@ -245,6 +266,14 @@ private void setParameters(String newMetadataLocation, Table tbl) {
parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation());
}

// If needed set the 'storage_handler' property to enable query from Hive
if (hiveEngineEnabled) {
parameters.put(hive_metastoreConstants.META_TABLE_STORAGE,
"org.apache.iceberg.mr.hive.HiveIcebergStorageHandler");
} else {
parameters.remove(hive_metastoreConstants.META_TABLE_STORAGE);
}

tbl.setParameters(parameters);
}

Expand Down Expand Up @@ -334,8 +363,6 @@ static void validateTableIsIceberg(Table table, String fullName) {
String tableType = table.getParameters().get(TABLE_TYPE_PROP);
NoSuchIcebergTableException.check(tableType != null && tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE),
"Not an iceberg table: %s (type=%s)", fullName, tableType);
NoSuchIcebergTableException.check(table.getParameters().get(METADATA_LOCATION_PROP) != null,
"Not an iceberg table: %s missing %s", fullName, METADATA_LOCATION_PROP);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.hadoop.Util;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
Expand All @@ -49,6 +53,8 @@

public class TestHiveMetastore {

private static final String DEFAULT_DATABASE_NAME = "default";

// create the metastore handlers based on whether we're working with Hive2 or Hive3 dependencies
// we need to do this because there is a breaking API change between Hive2 and Hive3
private static final DynConstructors.Ctor<HiveMetaStore.HMSHandler> HMS_HANDLER_CTOR = DynConstructors.builder()
Expand All @@ -66,26 +72,35 @@ public class TestHiveMetastore {
private ExecutorService executorService;
private TServer server;
private HiveMetaStore.HMSHandler baseHandler;
private HiveClientPool clientPool;

public void start() {
try {
hiveLocalDir = createTempDirectory("hive", asFileAttribute(fromString("rwxrwxrwx"))).toFile();
this.hiveLocalDir = createTempDirectory("hive", asFileAttribute(fromString("rwxrwxrwx"))).toFile();
File derbyLogFile = new File(hiveLocalDir, "derby.log");
System.setProperty("derby.stream.error.file", derbyLogFile.getAbsolutePath());
setupMetastoreDB("jdbc:derby:" + getDerbyPath() + ";create=true");

TServerSocket socket = new TServerSocket(0);
int port = socket.getServerSocket().getLocalPort();
hiveConf = newHiveConf(port);
server = newThriftServer(socket, hiveConf);
executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> server.serve());
this.hiveConf = newHiveConf(port);
this.server = newThriftServer(socket, hiveConf);
this.executorService = Executors.newSingleThreadExecutor();
this.executorService.submit(() -> server.serve());

// in Hive3, setting this as a system prop ensures that it will be picked up whenever a new HiveConf is created
System.setProperty(HiveConf.ConfVars.METASTOREURIS.varname, hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? It may be nice to use this for applications, but for testing it seems like this allows us to pass the configuration incorrectly and still have tests pass. If it isn't needed, I'd rather remove it so that we must pass configuration properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Hive3 patch brought this in. If I understood @marton-bod correctly this was needed because in the HiveRunner there are some places where the config is not passed down, and this was the only way to fix the HMS client misery.
What I did here was only move this from the test class to the TestHiveMetastore class so it will affect the ClientPools as well

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. The rationale is outlined here: #1478 (comment)
As you mentioned, when using Hive3, not all threads spawned during HiveRunner initialization receive our custom properties we set on the HiveShell, therefore using the system props is there to ensure they're picked up during HiveConf construction. This is needed to get the tests working properly on Hive3, without the PersistenceManager-related flaky tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, thanks for the reasoning. I guess I missed that this required a global setting in the other PR. I think this is fairly safe, since we always want to override the metastore URI using an option from the catalog's config.


this.clientPool = new HiveClientPool(1, hiveConf);
} catch (Exception e) {
throw new RuntimeException("Cannot start TestHiveMetastore", e);
}
}

public void stop() {
if (clientPool != null) {
clientPool.close();
}
if (server != null) {
server.stop();
}
Expand All @@ -104,11 +119,43 @@ public HiveConf hiveConf() {
return hiveConf;
}

public HiveClientPool clientPool() {
return clientPool;
}

public String getDatabasePath(String dbName) {
File dbDir = new File(hiveLocalDir, dbName + ".db");
return dbDir.getPath();
}

public void reset() throws Exception {
for (String dbName : clientPool.run(client -> client.getAllDatabases())) {
for (String tblName : clientPool.run(client -> client.getAllTables(dbName))) {
clientPool.run(client -> {
client.dropTable(dbName, tblName, true, true, true);
return null;
});
}

if (!DEFAULT_DATABASE_NAME.equals(dbName)) {
// Drop cascade, functions dropped by cascade
clientPool.run(client -> {
client.dropDatabase(dbName, true, true, true);
return null;
});
}
}

Path warehouseRoot = new Path(hiveLocalDir.getAbsolutePath());
FileSystem fs = Util.getFs(warehouseRoot, hiveConf);
for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) {
if (!fileStatus.getPath().getName().equals("derby.log") &&
!fileStatus.getPath().getName().equals("metastore_db")) {
fs.delete(fileStatus.getPath(), true);
}
}
}

private TServer newThriftServer(TServerSocket socket, HiveConf conf) throws Exception {
HiveConf serverConf = new HiveConf(conf);
serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:" + getDerbyPath() + ";create=true");
Expand Down
13 changes: 11 additions & 2 deletions mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public final class Catalogs {
private static final String HADOOP = "hadoop";
private static final String HIVE = "hive";

private static final String NAME = "name";
private static final String LOCATION = "location";
public static final String NAME = "name";
public static final String LOCATION = "location";

private static final Set<String> PROPERTIES_TO_REMOVE =
ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, LOCATION, NAME);
Expand Down Expand Up @@ -177,6 +177,15 @@ public static boolean dropTable(Configuration conf, Properties props) {
return new HadoopTables(conf).dropTable(location);
}

/**
* Returns true if HiveCatalog is used
* @param conf a Hadoop conf
* @return true if the Catalog is HiveCatalog
*/
public static boolean hiveCatalog(Configuration conf) {
return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
}

@VisibleForTesting
static Optional<Catalog> loadCatalog(Configuration conf) {
String catalogLoaderClass = conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ private InputFormatConfig() {
public static final String CATALOG = "iceberg.mr.catalog";
public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = "iceberg.mr.catalog.hadoop.warehouse.location";
public static final String CATALOG_LOADER_CLASS = "iceberg.mr.catalog.loader.class";
public static final String EXTERNAL_TABLE_PURGE = "external.table.purge";

public static final String CATALOG_NAME = "iceberg.catalog";
public static final String HADOOP_CATALOG = "hadoop.catalog";
Expand Down
Loading