diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3
index b4034c171fd3..608d095b7c62 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -77,19 +77,19 @@ hadoop-cloud-storage/3.3.4//hadoop-cloud-storage-3.3.4.jar
hadoop-openstack/3.3.4//hadoop-openstack-3.3.4.jar
hadoop-shaded-guava/1.1.1//hadoop-shaded-guava-1.1.1.jar
hadoop-yarn-server-web-proxy/3.3.4//hadoop-yarn-server-web-proxy-3.3.4.jar
-hive-beeline/2.3.9//hive-beeline-2.3.9.jar
-hive-cli/2.3.9//hive-cli-2.3.9.jar
-hive-common/2.3.9//hive-common-2.3.9.jar
-hive-exec/2.3.9/core/hive-exec-2.3.9-core.jar
-hive-jdbc/2.3.9//hive-jdbc-2.3.9.jar
-hive-llap-common/2.3.9//hive-llap-common-2.3.9.jar
-hive-metastore/2.3.9//hive-metastore-2.3.9.jar
-hive-serde/2.3.9//hive-serde-2.3.9.jar
+hive-beeline/2.3.10//hive-beeline-2.3.10.jar
+hive-cli/2.3.10//hive-cli-2.3.10.jar
+hive-common/2.3.10//hive-common-2.3.10.jar
+hive-exec/2.3.10/core/hive-exec-2.3.10-core.jar
+hive-jdbc/2.3.10//hive-jdbc-2.3.10.jar
+hive-llap-common/2.3.10//hive-llap-common-2.3.10.jar
+hive-metastore/2.3.10//hive-metastore-2.3.10.jar
+hive-serde/2.3.10//hive-serde-2.3.10.jar
hive-service-rpc/3.1.3//hive-service-rpc-3.1.3.jar
-hive-shims-0.23/2.3.9//hive-shims-0.23-2.3.9.jar
-hive-shims-common/2.3.9//hive-shims-common-2.3.9.jar
-hive-shims-scheduler/2.3.9//hive-shims-scheduler-2.3.9.jar
-hive-shims/2.3.9//hive-shims-2.3.9.jar
+hive-shims-0.23/2.3.10//hive-shims-0.23-2.3.10.jar
+hive-shims-common/2.3.10//hive-shims-common-2.3.10.jar
+hive-shims-scheduler/2.3.10//hive-shims-scheduler-2.3.10.jar
+hive-shims/2.3.10//hive-shims-2.3.10.jar
hive-storage-api/2.8.1//hive-storage-api-2.8.1.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
hk2-locator/2.6.1//hk2-locator-2.6.1.jar
@@ -173,7 +173,7 @@ kubernetes-model-storageclass/6.7.2//kubernetes-model-storageclass-6.7.2.jar
lapack/3.0.3//lapack-3.0.3.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
-libthrift/0.12.0//libthrift-0.12.0.jar
+libthrift/0.16.0//libthrift-0.16.0.jar
log4j-1.2-api/2.20.0//log4j-1.2-api-2.20.0.jar
log4j-api/2.20.0//log4j-api-2.20.0.jar
log4j-core/2.20.0//log4j-core-2.20.0.jar
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 4f626b4ff58c..d8319fb59c4d 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -83,9 +83,9 @@ Example:
To enable Hive integration for Spark SQL along with its JDBC server and CLI,
add the `-Phive` and `-Phive-thriftserver` profiles to your existing build options.
-By default Spark will build with Hive 2.3.9.
+By default Spark will build with Hive 2.3.10.
- # With Hive 2.3.9 support
+ # With Hive 2.3.10 support
./build/mvn -Pyarn -Phive -Phive-thriftserver -DskipTests clean package
## Packaging without Hadoop Dependencies for YARN
diff --git a/docs/sql-data-sources-hive-tables.md b/docs/sql-data-sources-hive-tables.md
index 13cd8fc2cc05..0bf141c5e28d 100644
--- a/docs/sql-data-sources-hive-tables.md
+++ b/docs/sql-data-sources-hive-tables.md
@@ -127,10 +127,10 @@ The following options can be used to configure the version of Hive that is used
Property Name Default Meaning Since Version
spark.sql.hive.metastore.version2.3.92.3.100.12.0 through 2.3.9 and 3.0.0 through 3.1.3.
+ options are 0.12.0 through 2.3.10 and 3.0.0 through 3.1.3.
builtin-Phive is
+ Use Hive 2.3.10, which is bundled with the Spark assembly when -Phive is
enabled. When this option is chosen, spark.sql.hive.metastore.version must be
- either 2.3.9 or not defined.
+ either 2.3.10 or not defined.
maven0.12.0 through 2.3.9 and " +
+ "0.12.0 through 2.3.10 and " +
"3.0.0 through 3.1.3.")
.version("1.4.0")
.stringConf
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index e51658355b10..8aa8b26d6e77 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -19,12 +19,16 @@ package org.apache.spark.sql.hive.client
import java.io.PrintStream
import java.lang.{Iterable => JIterable}
-import java.lang.reflect.InvocationTargetException
+import java.lang.reflect.{InvocationTargetException, Proxy => JdkProxy}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.{HashMap => JHashMap, Locale, Map => JMap}
import java.util.concurrent.TimeUnit._
+<<<<<<< HEAD
import scala.collection.JavaConverters._
+=======
+import scala.annotation.tailrec
+>>>>>>> 61d1bfdb204 ([SPARK-49489][SQL][HIVE] HMS client respects `hive.thrift.client.maxmessage.size`)
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -32,8 +36,12 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
+<<<<<<< HEAD
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType}
+=======
+import org.apache.hadoop.hive.metastore.{HiveMetaStoreClient, IMetaStoreClient, RetryingMetaStoreClient, TableType => HiveTableType}
+>>>>>>> 61d1bfdb204 ([SPARK-49489][SQL][HIVE] HMS client respects `hive.thrift.client.maxmessage.size`)
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable, _}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable}
@@ -43,7 +51,9 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+import org.apache.hadoop.hive.thrift.TFilterTransport
import org.apache.hadoop.security.UserGroupInformation
+import org.apache.thrift.transport.{TEndpointTransport, TTransport}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK
@@ -1349,13 +1359,83 @@ private[hive] object HiveClientImpl extends Logging {
case _ =>
new HiveConf(conf, classOf[HiveConf])
}
- try {
+ val hive = try {
Hive.getWithoutRegisterFns(hiveConf)
} catch {
- // SPARK-37069: not all Hive versions have the above method (e.g., Hive 2.3.9 has it but
+ // SPARK-37069: not all Hive versions have the above method (e.g., Hive 2.3.10 has it but
// 2.3.8 don't), therefore here we fallback when encountering the exception.
case _: NoSuchMethodError =>
Hive.get(hiveConf)
}
+
+ // Follow behavior of HIVE-26633 (4.0.0), only apply the max message size when
+ // `hive.thrift.client.max.message.size` is set and the value is positive
+ Option(hiveConf.get("hive.thrift.client.max.message.size"))
+ .map(HiveConf.toSizeBytes(_).toInt).filter(_ > 0)
+ .foreach { maxMessageSize =>
+ logDebug(s"Trying to set metastore client thrift max message to $maxMessageSize")
+ configureMaxThriftMessageSize(hiveConf, hive.getMSC, maxMessageSize)
+ }
+
+ hive
+ }
+
+ private def getFieldValue[T](obj: Any, fieldName: String): T = {
+ val field = obj.getClass.getDeclaredField(fieldName)
+ field.setAccessible(true)
+ field.get(obj).asInstanceOf[T]
+ }
+
+ private def getFieldValue[T](obj: Any, clazz: Class[_], fieldName: String): T = {
+ val field = clazz.getDeclaredField(fieldName)
+ field.setAccessible(true)
+ field.get(obj).asInstanceOf[T]
+ }
+
+ // SPARK-49489: a surgery for Hive 2.3.10 due to lack of HIVE-26633
+ private def configureMaxThriftMessageSize(
+ hiveConf: HiveConf, msClient: IMetaStoreClient, maxMessageSize: Int): Unit = try {
+ msClient match {
+ // Hive uses Java Dynamic Proxy to enhance the MetaStoreClient to support synchronization
+ // and retrying, we should unwrap and access the underlying MetaStoreClient instance firstly
+ case proxy if JdkProxy.isProxyClass(proxy.getClass) =>
+ JdkProxy.getInvocationHandler(proxy) match {
+ case syncHandler if syncHandler.getClass.getName.endsWith("SynchronizedHandler") =>
+ val wrappedMsc = getFieldValue[IMetaStoreClient](syncHandler, "client")
+ configureMaxThriftMessageSize(hiveConf, wrappedMsc, maxMessageSize)
+ case retryHandler: RetryingMetaStoreClient =>
+ val wrappedMsc = getFieldValue[IMetaStoreClient](retryHandler, "base")
+ configureMaxThriftMessageSize(hiveConf, wrappedMsc, maxMessageSize)
+ case _ =>
+ }
+ case msc: HiveMetaStoreClient if !msc.isLocalMetaStore =>
+ @tailrec
+ def configure(t: TTransport): Unit = t match {
+ // Unwrap and access the underlying TTransport when security enabled (Kerberos)
+ case tTransport: TFilterTransport =>
+ val wrappedTTransport = getFieldValue[TTransport](
+ tTransport, classOf[TFilterTransport], "wrapped")
+ configure(wrappedTTransport)
+ case tTransport: TEndpointTransport =>
+ val tConf = tTransport.getConfiguration
+ val currentMaxMessageSize = tConf.getMaxMessageSize
+ if (currentMaxMessageSize != maxMessageSize) {
+ logDebug("Change the current metastore client thrift max message size from " +
+ s"$currentMaxMessageSize to $maxMessageSize")
+ tConf.setMaxMessageSize(maxMessageSize)
+ // This internally call TEndpointTransport#resetConsumedMessageSize(-1L) to
+ // apply the updated maxMessageSize
+ tTransport.updateKnownMessageSize(0L)
+ }
+ case _ =>
+ }
+ configure(msc.getTTransport)
+ case _ => // do nothing
+ }
+ } catch {
+ // TEndpointTransport is added in THRIFT-5237 (0.14.0), for Hive versions that use older
+ // Thrift library (e.g. Hive 2.3.9 uses Thrift 0.9.3), which aren't affected by THRIFT-5237
+ // and don't need to apply HIVE-26633
+ case _: NoClassDefFoundError => // do nothing
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index eb69f23d2876..16f85b9b7a33 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -101,11 +101,10 @@ package object client {
"org.pentaho:pentaho-aggdesigner-algorithm"))
// Since HIVE-23980, calcite-core included in Hive package jar.
- case object v2_3 extends HiveVersion("2.3.9",
+ case object v2_3 extends HiveVersion("2.3.10",
exclusions = Seq("org.apache.calcite:calcite-core",
"org.apache.calcite:calcite-druid",
"org.apache.calcite.avatica:avatica",
- "com.fasterxml.jackson.core:*",
"org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm",
"org.apache.hive:hive-vector-code-gen"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index bfa6c8c3838d..19ada1df0490 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -220,7 +220,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
tryDownloadSpark(version, sparkTestingDir.getCanonicalPath)
}
- // Extract major.minor for testing Spark 3.1.x and 3.0.x with metastore 2.3.9 and Java 11.
+ // Extract major.minor for testing Spark 3.1.x and 3.0.x with metastore 2.3.10 and Java 11.
val hiveMetastoreVersion = """^\d+\.\d+""".r.findFirstIn(hiveVersion).get
val args = Seq(
"--name", "prepare testing tables",