Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,17 @@ public TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
}

private class HadoopCatalogTableBuilder extends BaseMetastoreCatalogTableBuilder {
private final String defaultLocation;

private HadoopCatalogTableBuilder(TableIdentifier identifier, Schema schema) {
super(identifier, schema);
defaultLocation = defaultWarehouseLocation(identifier);
}

@Override
public TableBuilder withLocation(String location) {
Preconditions.checkArgument(location == null, "Cannot set a custom location for a path-based table");
Preconditions.checkArgument(location == null || location.equals(defaultLocation),
"Cannot set a custom location for a path-based table. Expected " + defaultLocation + " but got " + location);
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
Expand Down Expand Up @@ -89,22 +90,27 @@ public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable)
}

// If the table does not exist collect data for table creation
String schemaString = catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA);
Preconditions.checkNotNull(schemaString, "Please provide a table schema");
// Just check if it is parsable, and later use for partition specification parsing
Schema schema = SchemaParser.fromJson(schemaString);

String specString = catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC);
if (specString != null) {
// Just check if it is parsable
PartitionSpecParser.fromJson(schema, specString);
}
// - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC takes precedence so the user can override the
// Iceberg schema and specification generated by the code
// - Partitioned Hive tables are currently not allowed

Schema schema = schema(catalogProperties, hmsTable);
PartitionSpec spec = spec(schema, catalogProperties, hmsTable);

catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema));
catalogProperties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec));

// Allow purging table data if the table is created now and not set otherwise
if (hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE) == null) {
hmsTable.getParameters().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "TRUE");
}

// If the table is not managed by Hive catalog then the location should be set
if (!Catalogs.hiveCatalog(conf)) {
Preconditions.checkArgument(hmsTable.getSd() != null && hmsTable.getSd().getLocation() != null,
"Table location not set");
}

// Remove creation related properties
PARAMETERS_TO_REMOVE.forEach(hmsTable.getParameters()::remove);
}
Expand Down Expand Up @@ -167,7 +173,7 @@ public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable,
* @param hmsTable Table for which we are calculating the properties
* @return The properties we can provide for Iceberg functions, like {@link Catalogs}
*/
private Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
private static Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
Properties properties = new Properties();
properties.putAll(hmsTable.getParameters());

Expand All @@ -185,4 +191,25 @@ private Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Tab

return properties;
}

private static Schema schema(Properties properties, org.apache.hadoop.hive.metastore.api.Table hmsTable) {
if (properties.getProperty(InputFormatConfig.TABLE_SCHEMA) != null) {
return SchemaParser.fromJson(properties.getProperty(InputFormatConfig.TABLE_SCHEMA));
} else {
return HiveSchemaUtil.schema(hmsTable.getSd().getCols());
}
}

private static PartitionSpec spec(Schema schema, Properties properties,
org.apache.hadoop.hive.metastore.api.Table hmsTable) {

Preconditions.checkArgument(hmsTable.getPartitionKeys() == null || hmsTable.getPartitionKeys().isEmpty(),
"Partitioned Hive tables are currently not supported");

if (properties.getProperty(InputFormatConfig.PARTITION_SPEC) != null) {
return PartitionSpecParser.fromJson(schema, properties.getProperty(InputFormatConfig.PARTITION_SPEC));
} else {
return PartitionSpec.unpartitioned();
}
}
}
39 changes: 36 additions & 3 deletions mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
import org.apache.iceberg.mr.mapred.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveIcebergSerDe extends AbstractSerDe {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergSerDe.class);

private ObjectInspector inspector;

@Override
Expand All @@ -57,9 +62,12 @@ public void initialize(@Nullable Configuration configuration, Properties serDePr
tableSchema = SchemaParser.fromJson((String) serDeProperties.get(InputFormatConfig.TABLE_SCHEMA));
} else {
try {
// always prefer the original table schema if there is one
tableSchema = Catalogs.loadTable(configuration, serDeProperties).schema();
} catch (NoSuchTableException nte) {
throw new SerDeException("Please provide an existing table or a valid schema", nte);
LOG.info("Using schema from existing table {}", SchemaParser.toJson(tableSchema));
} catch (Exception e) {
// If we can not load the table try the provided hive schema
tableSchema = hiveSchemaOrThrow(serDeProperties, e);
}
}

Expand Down Expand Up @@ -94,4 +102,29 @@ public Object deserialize(Writable writable) {
public ObjectInspector getObjectInspector() {
return inspector;
}

/**
* Gets the hive schema from the serDeProperties, and throws an exception if it is not provided. In the later case
* it adds the previousException as a root cause.
* @param serDeProperties The source of the hive schema
* @param previousException If we had an exception previously
* @return The hive schema parsed from the serDeProperties
* @throws SerDeException If there is no schema information in the serDeProperties
*/
private static Schema hiveSchemaOrThrow(Properties serDeProperties, Exception previousException)
throws SerDeException {
// Read the configuration parameters
String columnNames = serDeProperties.getProperty(serdeConstants.LIST_COLUMNS);
String columnTypes = serDeProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
String columnNameDelimiter = serDeProperties.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ?
serDeProperties.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
if (columnNames != null && columnTypes != null && columnNameDelimiter != null &&
!columnNames.isEmpty() && !columnTypes.isEmpty() && !columnNameDelimiter.isEmpty()) {
Schema hiveSchema = HiveSchemaUtil.schema(columnNames, columnTypes, columnNameDelimiter);
LOG.info("Using hive schema {}", SchemaParser.toJson(hiveSchema));
return hiveSchema;
} else {
throw new SerDeException("Please provide an existing table or a valid schema", previousException);
}
}
}
118 changes: 118 additions & 0 deletions mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

class HiveSchemaConverter {
private int id;

private HiveSchemaConverter() {
id = 0;
}

static Schema convert(List<String> names, List<TypeInfo> typeInfos) {
HiveSchemaConverter converter = new HiveSchemaConverter();
return new Schema(converter.convertInternal(names, typeInfos));
}

List<Types.NestedField> convertInternal(List<String> names, List<TypeInfo> typeInfos) {
List<Types.NestedField> result = new ArrayList<>(names.size());
for (int i = 0; i < names.size(); ++i) {
result.add(Types.NestedField.optional(id++, names.get(i), convert(typeInfos.get(i))));
}

return result;
}

Type convert(TypeInfo typeInfo) {
switch (typeInfo.getCategory()) {
case PRIMITIVE:
switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
case FLOAT:
return Types.FloatType.get();
case DOUBLE:
return Types.DoubleType.get();
case BOOLEAN:
return Types.BooleanType.get();
case BYTE:
case SHORT:
throw new IllegalArgumentException("Unsupported Hive type (" +
((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory() +
") for Iceberg tables. Consider using INT/INTEGER type instead.");
case INT:
return Types.IntegerType.get();
case LONG:
return Types.LongType.get();
case BINARY:
return Types.BinaryType.get();
case CHAR:
case VARCHAR:
throw new IllegalArgumentException("Unsupported Hive type (" +
((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory() +
") for Iceberg tables. Consider using STRING type instead.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be fine mapping these to string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this on he dev list where we are talking about the schema mapping.
If we decide that the Iceberg schema is the master and we always convert from there to Hive schema, then we can relax the 1-on-1 mapping restriction, and convert multiple Iceberg types to a single Hive type.

case STRING:
return Types.StringType.get();
case TIMESTAMP:
return Types.TimestampType.withoutZone();
case DATE:
return Types.DateType.get();
case DECIMAL:
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
return Types.DecimalType.of(decimalTypeInfo.precision(), decimalTypeInfo.scale());
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
default:
throw new IllegalArgumentException("Unsupported Hive type (" +
((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory() +
") for Iceberg tables.");
}
case STRUCT:
StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
List<Types.NestedField> fields =
convertInternal(structTypeInfo.getAllStructFieldNames(), structTypeInfo.getAllStructFieldTypeInfos());
return Types.StructType.of(fields);
case MAP:
MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
Type keyType = convert(mapTypeInfo.getMapKeyTypeInfo());
Type valueType = convert(mapTypeInfo.getMapValueTypeInfo());
int keyId = id++;
int valueId = id++;
return Types.MapType.ofOptional(keyId, valueId, keyType, valueType);
case LIST:
ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
Type listType = convert(listTypeInfo.getListElementTypeInfo());
return Types.ListType.ofOptional(id++, listType);
case UNION:
default:
throw new IllegalArgumentException("Unknown type " + typeInfo.getCategory());
}
}
}
71 changes: 71 additions & 0 deletions mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.iceberg.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveSchemaUtil {
private static final Logger LOG = LoggerFactory.getLogger(HiveSchemaUtil.class);

private HiveSchemaUtil() {
}

/**
* Converts the list of Hive FieldSchemas to an Iceberg schema.
* <p>
* The list should contain the columns and the partition columns as well.
* @param fieldSchemas The list of the columns
* @return An equivalent Iceberg Schema
*/
public static Schema schema(List<FieldSchema> fieldSchemas) {
List<String> names = new ArrayList<>(fieldSchemas.size());
List<TypeInfo> typeInfos = new ArrayList<>(fieldSchemas.size());

for (FieldSchema col : fieldSchemas) {
names.add(col.getName());
typeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(col.getType()));
}

return HiveSchemaConverter.convert(names, typeInfos);
}

/**
* Converts the Hive properties defining the columns to an Iceberg schema.
* @param columnNames The property containing the column names
* @param columnTypes The property containing the column types
* @param columnNameDelimiter The name delimiter
* @return The Iceberg schema
*/
public static Schema schema(String columnNames, String columnTypes, String columnNameDelimiter) {
// Parse the configuration parameters
List<String> names = new ArrayList<>();
Collections.addAll(names, columnNames.split(columnNameDelimiter));

return HiveSchemaConverter.convert(names, TypeInfoUtils.getTypeInfosFromTypeString(columnTypes));
}
}
Loading