Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

165 changes: 146 additions & 19 deletions mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,41 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.Tables;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hive.HiveCatalogs;
import org.apache.iceberg.hive.MetastoreUtil;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestCatalogs;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays;
import org.junit.Assert;
import org.junit.rules.TemporaryFolder;

// Helper class for setting up and testing various catalog implementations
abstract class TestTables {
public static final TestTableType[] ALL_TABLE_TYPES = new TestTableType[] {
TestTableType.HADOOP_TABLE,
TestTableType.HADOOP_CATALOG,
TestTableType.CUSTOM_CATALOG,
TestTableType.HIVE_CATALOG
};

private final Tables tables;
protected final TemporaryFolder temp;
Expand Down Expand Up @@ -83,12 +93,111 @@ public Tables tables() {
public abstract String locationForCreateTableSQL(TableIdentifier identifier);

/**
* If the {@link Catalogs#LOCATION} is needed for {@link Catalogs#loadTable(Configuration, Properties)} then this
* method should provide the location string. It should return <code>null</code> if the location is not needed.
* @param identifier The table identifier
* @return The location string for loadTable operation
* If an independent Hive table creation is needed for the given Catalog then this should return the Hive SQL
* string which we have to execute. Overridden for HiveCatalog where the Hive table is immediately created
* during the Iceberg table creation so no extra sql execution is required.
* @param identifier The table identifier (the namespace should be non-empty and single level)
* @return The SQL string - which should be executed, null - if it is not needed.
*/
public String createHiveTableSQL(TableIdentifier identifier) {
Preconditions.checkArgument(!identifier.namespace().isEmpty(), "Namespace should not be empty");
Preconditions.checkArgument(identifier.namespace().levels().length == 1, "Namespace should be single level");
return String.format("CREATE TABLE %s.%s STORED BY '%s' %s", identifier.namespace(), identifier.name(),
HiveIcebergStorageHandler.class.getName(), locationForCreateTableSQL(identifier));
}

/**
* Loads the given table from the actual catalog. Overridden by HadoopTables, since the parameter of the
* {@link Tables#load(String)} should be the full path of the table metadata directory
* @param identifier The table we want to load
* @return The Table loaded from the Catalog
*/
public abstract String loadLocation(TableIdentifier identifier);
public Table loadTable(TableIdentifier identifier) {
return tables.load(identifier.toString());
}

/**
* Creates a Hive test table. Creates the Iceberg table/data and creates the corresponding Hive table as well when
* needed. The table will be in the 'default' database. The table will be populated with the provided List of
* {@link Record}s.
* @param shell The HiveShell used for Hive table creation
* @param tableName The name of the test table
* @param schema The schema used for the table creation
* @param fileFormat The file format used for writing the data
* @param records The records with which the table is populated
* @throws IOException If there is an error writing data
*/
public void createTable(TestHiveShell shell, String tableName, Schema schema, FileFormat fileFormat,
List<Record> records) throws IOException {
createIcebergTable(shell.getHiveConf(), tableName, schema, fileFormat, records);
String createHiveSQL = createHiveTableSQL(TableIdentifier.of("default", tableName));
if (createHiveSQL != null) {
shell.executeStatement(createHiveSQL);
}
}

/**
* Creates a Hive test table. Creates the Iceberg table/data and creates the corresponding Hive table as well when
* needed. The table will be in the 'default' database. The table will be populated with the provided with randomly
* generated {@link Record}s.
* @param shell The HiveShell used for Hive table creation
* @param tableName The name of the test table
* @param schema The schema used for the table creation
* @param fileFormat The file format used for writing the data
* @param numRecords The number of records should be generated and stored in the table
* @throws IOException If there is an error writing data
*/
public List<Record> createTableWithGeneratedRecords(TestHiveShell shell, String tableName, Schema schema,
FileFormat fileFormat, int numRecords) throws IOException {
List<Record> records = TestHelper.generateRandomRecords(schema, numRecords, 0L);
createTable(shell, tableName, schema, fileFormat, records);
return records;
}

/**
* Creates an Iceberg table/data without creating the corresponding Hive table. The table will be in the 'default'
* namespace.
* @param configuration The configuration used during the table creation
* @param tableName The name of the test table
* @param schema The schema used for the table creation
* @param fileFormat The file format used for writing the data
* @param records The records with which the table is populated
* @return The create table
* @throws IOException If there is an error writing data
*/
public Table createIcebergTable(Configuration configuration, String tableName, Schema schema, FileFormat fileFormat,
List<Record> records) throws IOException {
String identifier = identifier("default." + tableName);
TestHelper helper = new TestHelper(new Configuration(configuration), tables(), identifier, schema,
PartitionSpec.unpartitioned(), fileFormat, temp);
Table table = helper.createTable();

if (records != null && !records.isEmpty()) {
helper.appendToTable(helper.writeFile(null, records));
}

return table;
}

/**
* Append more data to the table.
* @param configuration The configuration used during the table creation
* @param table The table to append
* @param format The file format used for writing the data
* @param partition The partition to write to
* @param records The records with which should be added to the table
* @throws IOException If there is an error writing data
*/
public void appendIcebergTable(Configuration configuration, Table table, FileFormat format, StructLike partition,
List<Record> records) throws IOException {
TestHelper helper = new TestHelper(
configuration, null, null, null, null, format, temp);

helper.setTable(table);
if (!records.isEmpty()) {
helper.appendToTable(helper.writeFile(partition, records));
}
}

private static class CatalogToTables implements Tables {

Expand Down Expand Up @@ -142,10 +251,6 @@ public String locationForCreateTableSQL(TableIdentifier identifier) {
return "LOCATION '" + warehouseLocation + TestTables.tablePath(identifier) + "' ";
}

@Override
public String loadLocation(TableIdentifier identifier) {
return warehouseLocation + TestTables.tablePath(identifier);
}
}

static class HadoopCatalogTestTables extends TestTables {
Expand Down Expand Up @@ -173,14 +278,9 @@ public Map<String, String> properties() {
public String locationForCreateTableSQL(TableIdentifier identifier) {
return "LOCATION '" + warehouseLocation + TestTables.tablePath(identifier) + "' ";
}

public String loadLocation(TableIdentifier identifier) {
return null;
}
}

static class HadoopTestTables extends TestTables {

HadoopTestTables(Configuration conf, TemporaryFolder temp) {
super(new HadoopTables(conf), temp);
}
Expand All @@ -206,9 +306,10 @@ public String locationForCreateTableSQL(TableIdentifier identifier) {
}

@Override
public String loadLocation(TableIdentifier identifier) {
return temp.getRoot().getPath() + TestTables.tablePath(identifier);
public Table loadTable(TableIdentifier identifier) {
return tables().load(temp.getRoot().getPath() + TestTables.tablePath(identifier));
}

}

static class HiveTestTables extends TestTables {
Expand All @@ -227,12 +328,38 @@ public String locationForCreateTableSQL(TableIdentifier identifier) {
return "";
}

public String loadLocation(TableIdentifier identifier) {
@Override
public String createHiveTableSQL(TableIdentifier identifier) {
return null;
}
}

private static String tablePath(TableIdentifier identifier) {
return "/" + Joiner.on("/").join(identifier.namespace().levels()) + "/" + identifier.name();
}

enum TestTableType {
HADOOP_TABLE {
public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) {
return new HadoopTestTables(conf, temporaryFolder);
}
},
HADOOP_CATALOG {
public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException {
return new HadoopCatalogTestTables(conf, temporaryFolder);
}
},
CUSTOM_CATALOG {
public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException {
return new CustomCatalogTestTables(conf, temporaryFolder);
}
},
HIVE_CATALOG {
public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) {
return new HiveTestTables(conf, temporaryFolder);
}
};

public abstract TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException;
}
}