Skip to content

Commit 2ea5621

Browse files
pan3793LuciferYang
authored andcommitted
[SPARK-49489][SQL][HIVE] HMS client respects hive.thrift.client.maxmessage.size
### What changes were proposed in this pull request? Partly port HIVE-26633 for Spark HMS client - respect `hive.thrift.client.max.message.size` if present and the value is positive. > Thrift client configuration for max message size. 0 or -1 will use the default defined in the Thrift library. The upper limit is 2147483648 bytes (or 2gb). Note: it's a Hive configuration, I follow the convention to not document on the Spark side. ### Why are the changes needed? 1. THRIFT-5237 (0.14.0) changes the max thrift message size from 2GiB to 100MiB 2. HIVE-25098 (4.0.0) upgrades Thrift from 0.13.0 to 0.14.1 3. HIVE-25996 (2.3.10) backports HIVE-25098 to branch-2.3 4. HIVE-26633 (4.0.0) introduces `hive.thrift.client.max.message.size` 5. SPARK-47018 (4.0.0) upgrades Hive from 2.3.9 to 2.3.10 Thus, Spark's HMS client does not respect `hive.thrift.client.max.message.size` and has a fixed max thrift message size 100MiB, users may hit the "MaxMessageSize reached" exception on accessing Hive tables with a large number of partitions. See discussion in apache#46468 (comment) ### Does this PR introduce _any_ user-facing change? No, it tackles the regression introduced by an unreleased change, namely SPARK-47018. The added code only takes effect when the user configures `hive.thrift.client.max.message.size` explicitly. ### How was this patch tested? This must be tested manually, as the current Spark UT does not cover the remote HMS cases. I constructed a test case in a testing Hadoop cluster with a remote HMS. Firstly, create a table with a large number of partitions. ``` $ spark-sql --num-executors=6 --executor-cores=4 --executor-memory=1g \ --conf spark.hive.exec.dynamic.partition.mode=nonstrict \ --conf spark.hive.exec.max.dynamic.partitions=1000000 spark-sql (default)> CREATE TABLE p PARTITIONED BY (year, month, day) STORED AS PARQUET AS SELECT /*+ REPARTITION(200) */ * FROM ( (SELECT CAST(id AS STRING) AS year FROM range(2000, 2100)) JOIN (SELECT CAST(id AS STRING) AS month FROM range(1, 13)) JOIN (SELECT CAST(id AS STRING) AS day FROM range(1, 31)) JOIN (SELECT 'this is some data' AS data) ); ``` Then try to tune `hive.thrift.client.max.message.size` and run a query that would trigger `getPartitions` thrift call. For example, when set to `1kb`, it throws `TTransportException: MaxMessageSize reached`, and the exception disappears after boosting the value. ``` $ spark-sql --conf spark.hive.thrift.client.max.message.size=1kb spark-sql (default)> SHOW PARTITIONS p; ... 2025-02-20 15:18:49 WARN RetryingMetaStoreClient: MetaStoreClient lost connection. Attempting to reconnect (1 of 1) after 1s. listPartitionNames org.apache.thrift.transport.TTransportException: MaxMessageSize reached at org.apache.thrift.transport.TEndpointTransport.checkReadBytesAvailable(TEndpointTransport.java:81) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.thrift.protocol.TProtocol.checkReadBytesAvailable(TProtocol.java:67) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.thrift.protocol.TBinaryProtocol.readListBegin(TBinaryProtocol.java:297) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_names_result$get_partition_names_resultStandardScheme.read(ThriftHiveMetastore.java) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_names_result$get_partition_names_resultStandardScheme.read(ThriftHiveMetastore.java) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_names_result.read(ThriftHiveMetastore.java) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:88) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition_names(ThriftHiveMetastore.java:2458) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition_names(ThriftHiveMetastore.java:2443) ~[hive-metastore-2.3.10.jar:2.3.10] at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionNames(HiveMetaStoreClient.java:1487) ~[hive-metastore-2.3.10.jar:2.3.10] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[?:?] at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173) ~[hive-metastore-2.3.10.jar:2.3.10] at jdk.proxy2/jdk.proxy2.$Proxy54.listPartitionNames(Unknown Source) ~[?:?] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[?:?] at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2349) ~[hive-metastore-2.3.10.jar:2.3.10] at jdk.proxy2/jdk.proxy2.$Proxy54.listPartitionNames(Unknown Source) ~[?:?] at org.apache.hadoop.hive.ql.metadata.Hive.getPartitionNames(Hive.java:2461) ~[hive-exec-2.3.10-core.jar:2.3.10] at org.apache.spark.sql.hive.client.Shim_v2_0.getPartitionNames(HiveShim.scala:976) ~[spark-hive_2.13-4.1.0-SNAPSHOT.jar:4.1.0-SNAPSHOT] ... ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50022 from pan3793/SPARK-49489. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent e66e20a commit 2ea5621

File tree

3 files changed

+91
-12
lines changed

3 files changed

+91
-12
lines changed

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class HiveMetastoreLazyInitializationSuite extends SparkFunSuite {
3434
.master("local[2]")
3535
.enableHiveSupport()
3636
.config("spark.hadoop.hive.metastore.uris", "thrift://127.0.0.1:11111")
37+
.config("spark.hadoop.hive.thrift.client.max.message.size", "1gb")
3738
.getOrCreate()
3839
val originalLevel = LogManager.getRootLogger.asInstanceOf[Logger].getLevel
3940
val originalClassLoader = Thread.currentThread().getContextClassLoader

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.lang.reflect.InvocationTargetException
2222
import java.util
2323
import java.util.Locale
2424

25+
import scala.annotation.tailrec
2526
import scala.collection.mutable
2627
import scala.util.control.NonFatal
2728

@@ -81,14 +82,18 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
8182
* Due to classloader isolation issues, pattern matching won't work here so we need
8283
* to compare the canonical names of the exceptions, which we assume to be stable.
8384
*/
84-
private def isClientException(e: Throwable): Boolean = {
85-
var temp: Class[_] = e.getClass
86-
var found = false
87-
while (temp != null && !found) {
88-
found = clientExceptions.contains(temp.getCanonicalName)
89-
temp = temp.getSuperclass
90-
}
91-
found
85+
@tailrec
86+
private def isClientException(e: Throwable): Boolean = e match {
87+
case re: RuntimeException if re.getCause != null =>
88+
isClientException(re.getCause)
89+
case e =>
90+
var temp: Class[_] = e.getClass
91+
var found = false
92+
while (temp != null && !found) {
93+
found = clientExceptions.contains(temp.getCanonicalName)
94+
temp = temp.getSuperclass
95+
}
96+
found
9297
}
9398

9499
/**

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ package org.apache.spark.sql.hive.client
1919

2020
import java.io.{OutputStream, PrintStream}
2121
import java.lang.{Iterable => JIterable}
22-
import java.lang.reflect.InvocationTargetException
22+
import java.lang.reflect.{InvocationTargetException, Proxy => JdkProxy}
2323
import java.nio.charset.StandardCharsets.UTF_8
2424
import java.util.{HashMap => JHashMap, Locale, Map => JMap}
2525
import java.util.concurrent.TimeUnit._
2626

27+
import scala.annotation.tailrec
2728
import scala.collection.mutable
2829
import scala.collection.mutable.ArrayBuffer
2930
import scala.jdk.CollectionConverters._
@@ -33,7 +34,7 @@ import org.apache.hadoop.conf.Configuration
3334
import org.apache.hadoop.fs.Path
3435
import org.apache.hadoop.hive.common.StatsSetupConst
3536
import org.apache.hadoop.hive.conf.HiveConf
36-
import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType}
37+
import org.apache.hadoop.hive.metastore.{HiveMetaStoreClient, IMetaStoreClient, RetryingMetaStoreClient, TableType => HiveTableType}
3738
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable, _}
3839
import org.apache.hadoop.hive.ql.Driver
3940
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable}
@@ -43,7 +44,9 @@ import org.apache.hadoop.hive.ql.session.SessionState
4344
import org.apache.hadoop.hive.serde.serdeConstants
4445
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
4546
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
47+
import org.apache.hadoop.hive.thrift.TFilterTransport
4648
import org.apache.hadoop.security.UserGroupInformation
49+
import org.apache.thrift.transport.{TEndpointTransport, TTransport}
4750

4851
import org.apache.spark.{SparkConf, SparkException, SparkThrowable}
4952
import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK
@@ -1407,13 +1410,83 @@ private[hive] object HiveClientImpl extends Logging {
14071410
case _ =>
14081411
new HiveConf(conf, classOf[HiveConf])
14091412
}
1410-
try {
1413+
val hive = try {
14111414
Hive.getWithoutRegisterFns(hiveConf)
14121415
} catch {
14131416
// SPARK-37069: not all Hive versions have the above method (e.g., Hive 2.3.9 has it but
1414-
// 2.3.8 don't), therefore here we fallback when encountering the exception.
1417+
// 2.3.8 doesn't), therefore here we fallback when encountering the exception.
14151418
case _: NoSuchMethodError =>
14161419
Hive.get(hiveConf)
14171420
}
1421+
1422+
// Follow behavior of HIVE-26633 (4.0.0), only apply the max message size when
1423+
// `hive.thrift.client.max.message.size` is set and the value is positive
1424+
Option(hiveConf.get("hive.thrift.client.max.message.size"))
1425+
.map(HiveConf.toSizeBytes(_).toInt).filter(_ > 0)
1426+
.foreach { maxMessageSize =>
1427+
logDebug(s"Trying to set metastore client thrift max message to $maxMessageSize")
1428+
configureMaxThriftMessageSize(hiveConf, hive.getMSC, maxMessageSize)
1429+
}
1430+
1431+
hive
1432+
}
1433+
1434+
private def getFieldValue[T](obj: Any, fieldName: String): T = {
1435+
val field = obj.getClass.getDeclaredField(fieldName)
1436+
field.setAccessible(true)
1437+
field.get(obj).asInstanceOf[T]
1438+
}
1439+
1440+
private def getFieldValue[T](obj: Any, clazz: Class[_], fieldName: String): T = {
1441+
val field = clazz.getDeclaredField(fieldName)
1442+
field.setAccessible(true)
1443+
field.get(obj).asInstanceOf[T]
1444+
}
1445+
1446+
// SPARK-49489: a surgery for Hive 2.3.10 due to lack of HIVE-26633
1447+
private def configureMaxThriftMessageSize(
1448+
hiveConf: HiveConf, msClient: IMetaStoreClient, maxMessageSize: Int): Unit = try {
1449+
msClient match {
1450+
// Hive uses Java Dynamic Proxy to enhance the MetaStoreClient to support synchronization
1451+
// and retrying, we should unwrap and access the underlying MetaStoreClient instance firstly
1452+
case proxy if JdkProxy.isProxyClass(proxy.getClass) =>
1453+
JdkProxy.getInvocationHandler(proxy) match {
1454+
case syncHandler if syncHandler.getClass.getName.endsWith("SynchronizedHandler") =>
1455+
val wrappedMsc = getFieldValue[IMetaStoreClient](syncHandler, "client")
1456+
configureMaxThriftMessageSize(hiveConf, wrappedMsc, maxMessageSize)
1457+
case retryHandler: RetryingMetaStoreClient =>
1458+
val wrappedMsc = getFieldValue[IMetaStoreClient](retryHandler, "base")
1459+
configureMaxThriftMessageSize(hiveConf, wrappedMsc, maxMessageSize)
1460+
case _ =>
1461+
}
1462+
case msc: HiveMetaStoreClient if !msc.isLocalMetaStore =>
1463+
@tailrec
1464+
def configure(t: TTransport): Unit = t match {
1465+
// Unwrap and access the underlying TTransport when security enabled (Kerberos)
1466+
case tTransport: TFilterTransport =>
1467+
val wrappedTTransport = getFieldValue[TTransport](
1468+
tTransport, classOf[TFilterTransport], "wrapped")
1469+
configure(wrappedTTransport)
1470+
case tTransport: TEndpointTransport =>
1471+
val tConf = tTransport.getConfiguration
1472+
val currentMaxMessageSize = tConf.getMaxMessageSize
1473+
if (currentMaxMessageSize != maxMessageSize) {
1474+
logDebug("Change the current metastore client thrift max message size from " +
1475+
s"$currentMaxMessageSize to $maxMessageSize")
1476+
tConf.setMaxMessageSize(maxMessageSize)
1477+
// This internally call TEndpointTransport#resetConsumedMessageSize(-1L) to
1478+
// apply the updated maxMessageSize
1479+
tTransport.updateKnownMessageSize(0L)
1480+
}
1481+
case _ =>
1482+
}
1483+
configure(msc.getTTransport)
1484+
case _ => // do nothing
1485+
}
1486+
} catch {
1487+
// TEndpointTransport is added in THRIFT-5237 (0.14.0), for Hive versions that use older
1488+
// Thrift library (e.g. Hive 2.3.9 uses Thrift 0.9.3), which aren't affected by THRIFT-5237
1489+
// and don't need to apply HIVE-26633
1490+
case _: NoClassDefFoundError => // do nothing
14181491
}
14191492
}

0 commit comments

Comments
 (0)