Skip to content
Merged
66 changes: 66 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,69 @@ project(':iceberg-mr') {
}
}

if (jdkVersion == '8') {
project(':iceberg-hive3') {

// run the tests in iceberg-mr with Hive3 dependencies
sourceSets {
test {
java.srcDirs = ['../mr/src/test/java', 'src/test/java']
resources.srcDirs = ['../mr/src/test/resources', 'src/test/resources']
}
}

// exclude these Hive2-specific tests from iceberg-mr
test {
exclude '**/TestIcebergDateObjectInspector.class'
exclude '**/TestIcebergTimestampObjectInspector.class'
}

dependencies {
compile project(':iceberg-api')
compile project(':iceberg-core')
compile project(':iceberg-data')
compile project(':iceberg-hive-metastore')
compile project(':iceberg-orc')
compile project(':iceberg-parquet')
compile project(':iceberg-mr')

compileOnly("org.apache.hadoop:hadoop-client:3.1.0") {
exclude group: 'org.apache.avro', module: 'avro'
}

compileOnly("org.apache.hive:hive-exec:3.1.2:core") {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
exclude group: 'com.google.guava'
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.calcite.avatica'
exclude group: 'org.apache.hive', module: 'hive-llap-tez'
exclude group: 'org.apache.logging.log4j'
exclude group: 'org.pentaho' // missing dependency
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
compileOnly("org.apache.hive:hive-metastore:3.1.2")
compileOnly("org.apache.hive:hive-serde:3.1.2")

testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')

testCompile("org.apache.avro:avro:1.9.2")
testCompile("org.apache.calcite:calcite-core")
testCompile("com.esotericsoftware:kryo-shaded:4.0.2")
testCompile("com.fasterxml.jackson.core:jackson-annotations:2.6.5")
testCompile("com.klarna:hiverunner:6.0.1") {
exclude group: 'javax.jms', module: 'jms'
exclude group: 'org.apache.hive', module: 'hive-exec'
exclude group: 'org.codehaus.jettison', module: 'jettison'
exclude group: 'org.apache.calcite.avatica'
}
}
}
}

project(':iceberg-hive-runtime') {
apply plugin: 'com.github.johnrengelman.shadow'

Expand All @@ -491,6 +554,9 @@ project(':iceberg-hive-runtime') {

dependencies {
compile project(':iceberg-mr')
if (jdkVersion == '8') {
compile project(':iceberg-hive3')
}
}

shadowJar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,19 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.iceberg.common.DynConstructors;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;

public class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException> {

// use appropriate ctor depending on whether we're working with Hive2 or Hive3 dependencies
// we need to do this because there is a breaking API change between Hive2 and Hive3
private static final DynConstructors.Ctor<HiveMetaStoreClient> CLIENT_CTOR = DynConstructors.builder()
.impl(HiveMetaStoreClient.class, HiveConf.class)
.impl(HiveMetaStoreClient.class, Configuration.class)
.build();

private final HiveConf hiveConf;

HiveClientPool(Configuration conf) {
Expand All @@ -41,7 +50,15 @@ public HiveClientPool(int poolSize, Configuration conf) {
@Override
protected HiveMetaStoreClient newClient() {
try {
return new HiveMetaStoreClient(hiveConf);
try {
return CLIENT_CTOR.newInstance(hiveConf);
} catch (RuntimeException e) {
// any MetaException would be wrapped into RuntimeException during reflection, so let's double-check type here
if (e.getCause() instanceof MetaException) {
throw (MetaException) e.getCause();
}
throw e;
}
} catch (MetaException e) {
throw new RuntimeMetaException(e, "Failed to connect to Hive Metastore");
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive;

public class MetastoreUtil {

// this class is unique to Hive3 and cannot be found in Hive2, therefore a good proxy to see if
// we are working against Hive3 dependencies
private static final String HIVE3_UNIQUE_CLASS = "org.apache.hadoop.hive.serde2.io.DateWritableV2";

private static final boolean HIVE3_PRESENT_ON_CLASSPATH = detectHive3();

private MetastoreUtil() {
}

/**
* Returns true if Hive3 dependencies are found on classpath, false otherwise.
*/
public static boolean hive3PresentOnClasspath() {
return HIVE3_PRESENT_ON_CLASSPATH;
}

private static boolean detectHive3() {
try {
Class.forName(HIVE3_UNIQUE_CLASS);
return true;
} catch (ClassNotFoundException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.common.DynMethods;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
Expand All @@ -47,10 +49,23 @@

public class TestHiveMetastore {

// create the metastore handlers based on whether we're working with Hive2 or Hive3 dependencies
// we need to do this because there is a breaking API change between Hive2 and Hive3
private static final DynConstructors.Ctor<HiveMetaStore.HMSHandler> HMS_HANDLER_CTOR = DynConstructors.builder()
.impl(HiveMetaStore.HMSHandler.class, String.class, Configuration.class)
.impl(HiveMetaStore.HMSHandler.class, String.class, HiveConf.class)
.build();

private static final DynMethods.StaticMethod GET_BASE_HMS_HANDLER = DynMethods.builder("getProxy")
.impl(RetryingHMSHandler.class, Configuration.class, IHMSHandler.class, boolean.class)
.impl(RetryingHMSHandler.class, HiveConf.class, IHMSHandler.class, boolean.class)
.buildStatic();

private File hiveLocalDir;
private HiveConf hiveConf;
private ExecutorService executorService;
private TServer server;
private HiveMetaStore.HMSHandler baseHandler;

public void start() {
try {
Expand Down Expand Up @@ -80,6 +95,9 @@ public void stop() {
if (hiveLocalDir != null) {
hiveLocalDir.delete();
}
if (baseHandler != null) {
baseHandler.shutdown();
}
}

public HiveConf hiveConf() {
Expand All @@ -94,8 +112,8 @@ public String getDatabasePath(String dbName) {
private TServer newThriftServer(TServerSocket socket, HiveConf conf) throws Exception {
HiveConf serverConf = new HiveConf(conf);
serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:" + getDerbyPath() + ";create=true");
HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", serverConf);
IHMSHandler handler = RetryingHMSHandler.getProxy(serverConf, baseHandler, false);
baseHandler = HMS_HANDLER_CTOR.newInstance("new db based metaserver", serverConf);
IHMSHandler handler = GET_BASE_HMS_HANDLER.invoke(serverConf, baseHandler, false);

TThreadPoolServer.Args args = new TThreadPoolServer.Args(socket)
.processor(new TSetIpAddressProcessor<>(handler))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive.serde.objectinspector;

import java.time.LocalDate;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.iceberg.util.DateTimeUtil;

public final class IcebergDateObjectInspectorHive3 extends AbstractPrimitiveJavaObjectInspector
implements DateObjectInspector {

private static final IcebergDateObjectInspectorHive3 INSTANCE = new IcebergDateObjectInspectorHive3();

public static IcebergDateObjectInspectorHive3 get() {
return INSTANCE;
}

private IcebergDateObjectInspectorHive3() {
super(TypeInfoFactory.dateTypeInfo);
}

@Override
public Date getPrimitiveJavaObject(Object o) {
if (o == null) {
return null;
}
LocalDate date = (LocalDate) o;
return Date.ofEpochDay(DateTimeUtil.daysFromDate(date));
}

@Override
public DateWritableV2 getPrimitiveWritableObject(Object o) {
return o == null ? null : new DateWritableV2(DateTimeUtil.daysFromDate((LocalDate) o));
}

@Override
public Object copyObject(Object o) {
return o == null ? null : new Date((Date) o);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive.serde.objectinspector;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;

public abstract class IcebergTimestampObjectInspectorHive3 extends AbstractPrimitiveJavaObjectInspector
implements TimestampObjectInspector {

private static final IcebergTimestampObjectInspectorHive3 INSTANCE_WITH_ZONE =
new IcebergTimestampObjectInspectorHive3() {
@Override
LocalDateTime toLocalDateTime(Object o) {
return ((OffsetDateTime) o).toLocalDateTime();
}
};

private static final IcebergTimestampObjectInspectorHive3 INSTANCE_WITHOUT_ZONE =
new IcebergTimestampObjectInspectorHive3() {
@Override
LocalDateTime toLocalDateTime(Object o) {
return (LocalDateTime) o;
}
};

public static IcebergTimestampObjectInspectorHive3 get(boolean adjustToUTC) {
return adjustToUTC ? INSTANCE_WITH_ZONE : INSTANCE_WITHOUT_ZONE;
}

private IcebergTimestampObjectInspectorHive3() {
super(TypeInfoFactory.timestampTypeInfo);
}


abstract LocalDateTime toLocalDateTime(Object object);

@Override
public Timestamp getPrimitiveJavaObject(Object o) {
if (o == null) {
return null;
}
LocalDateTime time = toLocalDateTime(o);
Timestamp timestamp = Timestamp.ofEpochMilli(time.toInstant(ZoneOffset.UTC).toEpochMilli());
timestamp.setNanos(time.getNano());
return timestamp;
}

@Override
public TimestampWritableV2 getPrimitiveWritableObject(Object o) {
Timestamp ts = getPrimitiveJavaObject(o);
return ts == null ? null : new TimestampWritableV2(ts);
}

@Override
public Object copyObject(Object o) {
if (o == null) {
return null;
}

Timestamp ts = (Timestamp) o;
Timestamp copy = new Timestamp(ts);
copy.setNanos(ts.getNanos());
return copy;
}

}
Loading