Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,19 @@ hadoop-cloud-storage/3.3.4//hadoop-cloud-storage-3.3.4.jar
hadoop-openstack/3.3.4//hadoop-openstack-3.3.4.jar
hadoop-shaded-guava/1.1.1//hadoop-shaded-guava-1.1.1.jar
hadoop-yarn-server-web-proxy/3.3.4//hadoop-yarn-server-web-proxy-3.3.4.jar
hive-beeline/2.3.9//hive-beeline-2.3.9.jar
hive-cli/2.3.9//hive-cli-2.3.9.jar
hive-common/2.3.9//hive-common-2.3.9.jar
hive-exec/2.3.9/core/hive-exec-2.3.9-core.jar
hive-jdbc/2.3.9//hive-jdbc-2.3.9.jar
hive-llap-common/2.3.9//hive-llap-common-2.3.9.jar
hive-metastore/2.3.9//hive-metastore-2.3.9.jar
hive-serde/2.3.9//hive-serde-2.3.9.jar
hive-beeline/2.3.10//hive-beeline-2.3.10.jar
hive-cli/2.3.10//hive-cli-2.3.10.jar
hive-common/2.3.10//hive-common-2.3.10.jar
hive-exec/2.3.10/core/hive-exec-2.3.10-core.jar
hive-jdbc/2.3.10//hive-jdbc-2.3.10.jar
hive-llap-common/2.3.10//hive-llap-common-2.3.10.jar
hive-metastore/2.3.10//hive-metastore-2.3.10.jar
hive-serde/2.3.10//hive-serde-2.3.10.jar
hive-service-rpc/3.1.3//hive-service-rpc-3.1.3.jar
hive-shims-0.23/2.3.9//hive-shims-0.23-2.3.9.jar
hive-shims-common/2.3.9//hive-shims-common-2.3.9.jar
hive-shims-scheduler/2.3.9//hive-shims-scheduler-2.3.9.jar
hive-shims/2.3.9//hive-shims-2.3.9.jar
hive-shims-0.23/2.3.10//hive-shims-0.23-2.3.10.jar
hive-shims-common/2.3.10//hive-shims-common-2.3.10.jar
hive-shims-scheduler/2.3.10//hive-shims-scheduler-2.3.10.jar
hive-shims/2.3.10//hive-shims-2.3.10.jar
hive-storage-api/2.8.1//hive-storage-api-2.8.1.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
hk2-locator/2.6.1//hk2-locator-2.6.1.jar
Expand Down Expand Up @@ -173,7 +173,7 @@ kubernetes-model-storageclass/6.7.2//kubernetes-model-storageclass-6.7.2.jar
lapack/3.0.3//lapack-3.0.3.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
libthrift/0.12.0//libthrift-0.12.0.jar
libthrift/0.16.0//libthrift-0.16.0.jar
log4j-1.2-api/2.20.0//log4j-1.2-api-2.20.0.jar
log4j-api/2.20.0//log4j-api-2.20.0.jar
log4j-core/2.20.0//log4j-core-2.20.0.jar
Expand Down
4 changes: 2 additions & 2 deletions docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ Example:

To enable Hive integration for Spark SQL along with its JDBC server and CLI,
add the `-Phive` and `-Phive-thriftserver` profiles to your existing build options.
By default Spark will build with Hive 2.3.9.
By default Spark will build with Hive 2.3.10.

# With Hive 2.3.9 support
# With Hive 2.3.10 support
./build/mvn -Pyarn -Phive -Phive-thriftserver -DskipTests clean package

## Packaging without Hadoop Dependencies for YARN
Expand Down
8 changes: 4 additions & 4 deletions docs/sql-data-sources-hive-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ The following options can be used to configure the version of Hive that is used
<thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr></thead>
<tr>
<td><code>spark.sql.hive.metastore.version</code></td>
<td><code>2.3.9</code></td>
<td><code>2.3.10</code></td>
<td>
Version of the Hive metastore. Available
options are <code>0.12.0</code> through <code>2.3.9</code> and <code>3.0.0</code> through <code>3.1.3</code>.
options are <code>0.12.0</code> through <code>2.3.10</code> and <code>3.0.0</code> through <code>3.1.3</code>.
</td>
<td>1.4.0</td>
</tr>
Expand All @@ -142,9 +142,9 @@ The following options can be used to configure the version of Hive that is used
property can be one of four options:
<ol>
<li><code>builtin</code></li>
Use Hive 2.3.9, which is bundled with the Spark assembly when <code>-Phive</code> is
Use Hive 2.3.10, which is bundled with the Spark assembly when <code>-Phive</code> is
enabled. When this option is chosen, <code>spark.sql.hive.metastore.version</code> must be
either <code>2.3.9</code> or not defined.
either <code>2.3.10</code> or not defined.
<li><code>maven</code></li>
Use Hive jars of specified version downloaded from Maven repositories. This configuration
is not generally recommended for production deployments.
Expand Down
2 changes: 1 addition & 1 deletion docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ Python UDF registration is unchanged.
Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs.
Currently, Hive SerDes and UDFs are based on built-in Hive,
and Spark SQL can be connected to different versions of Hive Metastore
(from 0.12.0 to 2.3.9 and 3.0.0 to 3.1.3. Also see [Interacting with Different Versions of Hive Metastore](sql-data-sources-hive-tables.html#interacting-with-different-versions-of-hive-metastore)).
(from 0.12.0 to 10 and 3.0.0 to 3.1.3. Also see [Interacting with Different Versions of Hive Metastore](sql-data-sources-hive-tables.html#interacting-with-different-versions-of-hive-metastore)).

#### Deploying in Existing Hive Warehouses
{:.no_toc}
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@
<hive.group>org.apache.hive</hive.group>
<hive.classifier>core</hive.classifier>
<!-- Version used in Maven Hive dependency -->
<hive.version>2.3.9</hive.version>
<hive23.version>2.3.9</hive23.version>
<hive.version>2.3.10</hive.version>
<hive23.version>2.3.10</hive23.version>
<!-- Version used for internal directory structure -->
<hive.version.short>2.3</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
Expand Down Expand Up @@ -209,7 +209,7 @@
<joda.version>2.12.5</joda.version>
<jodd.version>3.5.2</jodd.version>
<jsr305.version>3.0.0</jsr305.version>
<libthrift.version>0.12.0</libthrift.version>
<libthrift.version>0.16.0</libthrift.version>
<!-- Please don't upgrade the version to 4.10+, it depends on JDK 11 -->
<antlr4.version>4.9.3</antlr4.version>
<jpam.version>1.1</jpam.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3752,7 +3752,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark

test("SPARK-33084: Add jar support Ivy URI in SQL") {
val sc = spark.sparkContext
val hiveVersion = "2.3.9"
val hiveVersion = "2.3.10"
// transitive=false, only download specified jar
sql(s"ADD JAR ivy://org.apache.hive.hcatalog:hive-hcatalog-core:$hiveVersion?transitive=false")
assert(sc.listJars()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

public final class KerberosSaslHelper {

Expand Down Expand Up @@ -68,8 +69,8 @@ public static TTransport createSubjectAssumedTransport(String principal,
new TSaslClientTransport("GSSAPI", null, names[0], names[1], saslProps, null,
underlyingTransport);
return new TSubjectAssumingTransport(saslTransport);
} catch (SaslException se) {
throw new IOException("Could not instantiate SASL transport", se);
} catch (SaslException | TTransportException se) {
throw new IOException("Could not instantiate transport", se);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;

public final class PlainSaslHelper {
Expand All @@ -64,7 +65,7 @@ public static TTransportFactory getPlainTransportFactory(String authTypeStr)
}

public static TTransport getPlainTransport(String username, String password,
TTransport underlyingTransport) throws SaslException {
TTransport underlyingTransport) throws SaslException, TTransportException {
return new TSaslClientTransport("PLAIN", null, null, null, new HashMap<String, String>(),
new PlainCallbackHandler(username, password), underlyingTransport);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ public TSetIpAddressProcessor(Iface iface) {
}

@Override
public boolean process(final TProtocol in, final TProtocol out) throws TException {
public void process(final TProtocol in, final TProtocol out) throws TException {
setIpAddress(in);
setUserName(in);
try {
return super.process(in, out);
super.process(in, out);
return;
} finally {
THREAD_LOCAL_USER_NAME.remove();
THREAD_LOCAL_IP_ADDRESS.remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,10 @@ protected void initializeServer() {

// Server args
int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
int requestTimeout = (int) hiveConf.getTimeVar(
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
int beBackoffSlotLength = (int) hiveConf.getTimeVar(
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
.processorFactory(processorFactory).transportFactory(transportFactory)
.protocolFactory(new TBinaryProtocol.Factory())
.inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
.requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
.beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
.executorService(executorService);

// TCP Server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ public void setSessionHandle(SessionHandle sessionHandle) {
public SessionHandle getSessionHandle() {
return sessionHandle;
}

@Override
public <T> T unwrap(Class<T> aClass) {
return null;
}

@Override
public boolean isWrapperFor(Class<?> aClass) {
return false;
}
}

public ThriftCLIService(CLIService service, String serviceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class HiveMetastoreLazyInitializationSuite extends SparkFunSuite {
.master("local[2]")
.enableHiveSupport()
.config("spark.hadoop.hive.metastore.uris", "thrift://127.0.0.1:11111")
.config("spark.hadoop.hive.thrift.client.max.message.size", "1gb")
.getOrCreate()
val originalLevel = LogManager.getRootLogger.asInstanceOf[Logger].getLevel
val originalClassLoader = Thread.currentThread().getContextClassLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.lang.reflect.InvocationTargetException
import java.util
import java.util.Locale

import scala.annotation.tailrec
import scala.collection.mutable
import scala.util.control.NonFatal

Expand Down Expand Up @@ -80,14 +81,18 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* Due to classloader isolation issues, pattern matching won't work here so we need
* to compare the canonical names of the exceptions, which we assume to be stable.
*/
private def isClientException(e: Throwable): Boolean = {
var temp: Class[_] = e.getClass
var found = false
while (temp != null && !found) {
found = clientExceptions.contains(temp.getCanonicalName)
temp = temp.getSuperclass
}
found
@tailrec
private def isClientException(e: Throwable): Boolean = e match {
case re: RuntimeException if re.getCause != null =>
isClientException(re.getCause)
case e =>
var temp: Class[_] = e.getClass
var found = false
while (temp != null && !found) {
found = clientExceptions.contains(temp.getCanonicalName)
temp = temp.getSuperclass
}
found
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private[spark] object HiveUtils extends Logging {

val HIVE_METASTORE_VERSION = buildStaticConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
"<code>0.12.0</code> through <code>2.3.9</code> and " +
"<code>0.12.0</code> through <code>2.3.10</code> and " +
"<code>3.0.0</code> through <code>3.1.3</code>.")
.version("1.4.0")
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,29 @@ package org.apache.spark.sql.hive.client

import java.io.PrintStream
import java.lang.{Iterable => JIterable}
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.{InvocationTargetException, Proxy => JdkProxy}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.{HashMap => JHashMap, Locale, Map => JMap}
import java.util.concurrent.TimeUnit._

<<<<<<< HEAD
import scala.collection.JavaConverters._
=======
import scala.annotation.tailrec
>>>>>>> 61d1bfdb204 ([SPARK-49489][SQL][HIVE] HMS client respects `hive.thrift.client.maxmessage.size`)
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
<<<<<<< HEAD
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType}
=======
import org.apache.hadoop.hive.metastore.{HiveMetaStoreClient, IMetaStoreClient, RetryingMetaStoreClient, TableType => HiveTableType}
>>>>>>> 61d1bfdb204 ([SPARK-49489][SQL][HIVE] HMS client respects `hive.thrift.client.maxmessage.size`)
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable, _}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable}
Expand All @@ -43,7 +51,9 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.hive.thrift.TFilterTransport
import org.apache.hadoop.security.UserGroupInformation
import org.apache.thrift.transport.{TEndpointTransport, TTransport}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK
Expand Down Expand Up @@ -1349,13 +1359,83 @@ private[hive] object HiveClientImpl extends Logging {
case _ =>
new HiveConf(conf, classOf[HiveConf])
}
try {
val hive = try {
Hive.getWithoutRegisterFns(hiveConf)
} catch {
// SPARK-37069: not all Hive versions have the above method (e.g., Hive 2.3.9 has it but
// SPARK-37069: not all Hive versions have the above method (e.g., Hive 2.3.10 has it but
// 2.3.8 don't), therefore here we fallback when encountering the exception.
case _: NoSuchMethodError =>
Hive.get(hiveConf)
}

// Follow behavior of HIVE-26633 (4.0.0), only apply the max message size when
// `hive.thrift.client.max.message.size` is set and the value is positive
Option(hiveConf.get("hive.thrift.client.max.message.size"))
.map(HiveConf.toSizeBytes(_).toInt).filter(_ > 0)
.foreach { maxMessageSize =>
logDebug(s"Trying to set metastore client thrift max message to $maxMessageSize")
configureMaxThriftMessageSize(hiveConf, hive.getMSC, maxMessageSize)
}

hive
}

private def getFieldValue[T](obj: Any, fieldName: String): T = {
val field = obj.getClass.getDeclaredField(fieldName)
field.setAccessible(true)
field.get(obj).asInstanceOf[T]
}

private def getFieldValue[T](obj: Any, clazz: Class[_], fieldName: String): T = {
val field = clazz.getDeclaredField(fieldName)
field.setAccessible(true)
field.get(obj).asInstanceOf[T]
}

// SPARK-49489: a surgery for Hive 2.3.10 due to lack of HIVE-26633
private def configureMaxThriftMessageSize(
hiveConf: HiveConf, msClient: IMetaStoreClient, maxMessageSize: Int): Unit = try {
msClient match {
// Hive uses Java Dynamic Proxy to enhance the MetaStoreClient to support synchronization
// and retrying, we should unwrap and access the underlying MetaStoreClient instance firstly
case proxy if JdkProxy.isProxyClass(proxy.getClass) =>
JdkProxy.getInvocationHandler(proxy) match {
case syncHandler if syncHandler.getClass.getName.endsWith("SynchronizedHandler") =>
val wrappedMsc = getFieldValue[IMetaStoreClient](syncHandler, "client")
configureMaxThriftMessageSize(hiveConf, wrappedMsc, maxMessageSize)
case retryHandler: RetryingMetaStoreClient =>
val wrappedMsc = getFieldValue[IMetaStoreClient](retryHandler, "base")
configureMaxThriftMessageSize(hiveConf, wrappedMsc, maxMessageSize)
case _ =>
}
case msc: HiveMetaStoreClient if !msc.isLocalMetaStore =>
@tailrec
def configure(t: TTransport): Unit = t match {
// Unwrap and access the underlying TTransport when security enabled (Kerberos)
case tTransport: TFilterTransport =>
val wrappedTTransport = getFieldValue[TTransport](
tTransport, classOf[TFilterTransport], "wrapped")
configure(wrappedTTransport)
case tTransport: TEndpointTransport =>
val tConf = tTransport.getConfiguration
val currentMaxMessageSize = tConf.getMaxMessageSize
if (currentMaxMessageSize != maxMessageSize) {
logDebug("Change the current metastore client thrift max message size from " +
s"$currentMaxMessageSize to $maxMessageSize")
tConf.setMaxMessageSize(maxMessageSize)
// This internally call TEndpointTransport#resetConsumedMessageSize(-1L) to
// apply the updated maxMessageSize
tTransport.updateKnownMessageSize(0L)
}
case _ =>
}
configure(msc.getTTransport)
case _ => // do nothing
}
} catch {
// TEndpointTransport is added in THRIFT-5237 (0.14.0), for Hive versions that use older
// Thrift library (e.g. Hive 2.3.9 uses Thrift 0.9.3), which aren't affected by THRIFT-5237
// and don't need to apply HIVE-26633
case _: NoClassDefFoundError => // do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,10 @@ package object client {
"org.pentaho:pentaho-aggdesigner-algorithm"))

// Since HIVE-23980, calcite-core included in Hive package jar.
case object v2_3 extends HiveVersion("2.3.9",
case object v2_3 extends HiveVersion("2.3.10",
exclusions = Seq("org.apache.calcite:calcite-core",
"org.apache.calcite:calcite-druid",
"org.apache.calcite.avatica:avatica",
"com.fasterxml.jackson.core:*",
"org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm",
"org.apache.hive:hive-vector-code-gen"))
Expand Down
Loading