From 1d13c97121c412b499879d8bc271820dc58619fc Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 11 Nov 2024 17:33:08 +0800 Subject: [PATCH 1/9] [SPARK-45265][SQL] Support Hive Metastore Server 4.0 --- .../hive/thriftserver/SparkSQLDriver.scala | 6 +- .../spark/sql/hive/HiveExternalCatalog.scala | 2 +- .../org/apache/spark/sql/hive/HiveUtils.scala | 5 +- .../spark/sql/hive/client/HiveClient.scala | 3 +- .../sql/hive/client/HiveClientImpl.scala | 53 ++-- .../spark/sql/hive/client/HiveShim.scala | 292 +++++++++++++++++- .../hive/client/IsolatedClientLoader.scala | 1 + .../spark/sql/hive/client/package.scala | 16 +- .../sql/hive/client/HiveClientSuite.scala | 12 +- .../sql/hive/client/HiveClientVersions.scala | 2 +- .../client/HivePartitionFilteringSuite.scala | 3 +- .../sql/hive/client/HiveVersionSuite.scala | 11 +- 12 files changed, 349 insertions(+), 57 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 650a5df340215..08f5d63bba1a6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.COMMAND import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.CommandResult -import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} +import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution} import org.apache.spark.sql.execution.HiveResult.hiveResultString import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} @@ -82,10 +82,10 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv. } catch { case st: SparkThrowable => logDebug(s"Failed in [$command]", st) - new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(st), st.getSqlState, st) + throw new QueryExecutionException(ExceptionUtils.getStackTrace(st)) case cause: Throwable => logError(log"Failed in [${MDC(COMMAND, command)}]", cause) - new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null, cause) + throw new QueryExecutionException(ExceptionUtils.getStackTrace(cause)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 56406fc5faabd..194572be2a64d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1030,7 +1030,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } val metaStoreParts = partsWithLocation .map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec))) - client.createPartitions(db, table, metaStoreParts, ignoreIfExists) + client.createPartitions(tableMeta, metaStoreParts, ignoreIfExists) } override def dropPartitions( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 30201dcee552d..478f486eeb213 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -74,8 +74,9 @@ private[spark] object HiveUtils extends Logging { val HIVE_METASTORE_VERSION = buildStaticConf("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + - "2.0.0 through 2.3.10 and " + - "3.0.0 through 3.1.3.") + "2.0.0 through 2.3.10, " + + "3.0.0 through 3.1.3 and " + + "4.0.0 through 4.0.1.") .version("1.4.0") .stringConf .checkValue(isCompatibleHiveVersion, "Unsupported Hive Metastore version") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index ff225e7a50f0d..3d89f31e1965b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -164,8 +164,7 @@ private[hive] trait HiveClient { * Create one or many partitions in the given table. */ def createPartitions( - db: String, - table: String, + table: CatalogTable, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 735814c9ae084..3adf331366149 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.client -import java.io.PrintStream +import java.io.{OutputStream, PrintStream} import java.lang.{Iterable => JIterable} import java.lang.reflect.InvocationTargetException import java.nio.charset.StandardCharsets.UTF_8 @@ -121,6 +121,7 @@ private[hive] class HiveClientImpl( case hive.v2_3 => new Shim_v2_3() case hive.v3_0 => new Shim_v3_0() case hive.v3_1 => new Shim_v3_1() + case hive.v4_0 => new Shim_v4_0() } // Create an internal session state for this HiveClientImpl. @@ -177,8 +178,10 @@ private[hive] class HiveClientImpl( // got changed. We reset it to clientLoader.ClassLoader here. state.getConf.setClassLoader(clientLoader.classLoader) shim.setCurrentSessionState(state) - state.out = new PrintStream(outputBuffer, true, UTF_8.name()) - state.err = new PrintStream(outputBuffer, true, UTF_8.name()) + val clz = state.getClass.getField("out").getType.asInstanceOf[Class[_ <: PrintStream]] + val ctor = clz.getConstructor(classOf[OutputStream], classOf[Boolean], classOf[String]) + state.getClass.getField("out").set(state, ctor.newInstance(outputBuffer, true, UTF_8.name())) + state.getClass.getField("err").set(state, ctor.newInstance(outputBuffer, true, UTF_8.name())) state } @@ -307,15 +310,27 @@ private[hive] class HiveClientImpl( } def setOut(stream: PrintStream): Unit = withHiveState { - state.out = stream + val ctor = state.getClass.getField("out") + .getType + .asInstanceOf[Class[_ <: PrintStream]] + .getConstructor(classOf[OutputStream]) + state.getClass.getField("out").set(state, ctor.newInstance(stream)) } def setInfo(stream: PrintStream): Unit = withHiveState { - state.info = stream + val ctor = state.getClass.getField("info") + .getType + .asInstanceOf[Class[_ <: PrintStream]] + .getConstructor(classOf[OutputStream]) + state.getClass.getField("info").set(state, ctor.newInstance(stream)) } def setError(stream: PrintStream): Unit = withHiveState { - state.err = stream + val ctor = state.getClass.getField("err") + .getType + .asInstanceOf[Class[_ <: PrintStream]] + .getConstructor(classOf[OutputStream]) + state.getClass.getField("err").set(state, ctor.newInstance(stream)) } private def setCurrentDatabaseRaw(db: String): Unit = { @@ -629,21 +644,22 @@ private[hive] class HiveClientImpl( } override def createPartitions( - db: String, - table: String, + table: CatalogTable, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = withHiveState { def replaceExistException(e: Throwable): Unit = e match { case _: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] => - val hiveTable = client.getTable(db, table) + val db = table.identifier.database.getOrElse(state.getCurrentDatabase) + val tableName = table.identifier.table + val hiveTable = client.getTable(db, tableName) val existingParts = parts.filter { p => shim.getPartitions(client, hiveTable, p.spec.asJava).nonEmpty } - throw new PartitionsAlreadyExistException(db, table, existingParts.map(_.spec)) + throw new PartitionsAlreadyExistException(db, tableName, existingParts.map(_.spec)) case _ => throw e } try { - shim.createPartitions(client, db, table, parts, ignoreIfExists) + shim.createPartitions(client, toHiveTable(table), parts, ignoreIfExists) } catch { case e: InvocationTargetException => replaceExistException(e.getCause) case e: Throwable => replaceExistException(e) @@ -891,17 +907,14 @@ private[hive] class HiveClientImpl( results case _ => - if (state.out != null) { + val out = state.getClass.getField("out").get(state) + if (out != null) { // scalastyle:off println - state.out.println(tokens(0) + " " + cmd_1) + out.asInstanceOf[PrintStream].println(tokens(0) + " " + cmd_1) // scalastyle:on println } - val response: CommandProcessorResponse = proc.run(cmd_1) - // Throw an exception if there is an error in query processing. - if (response.getResponseCode != 0) { - throw new QueryExecutionException(response.getErrorMessage) - } - Seq(response.getResponseCode.toString) + proc.run(cmd_1) + Seq("0") } } catch { case e: Exception => @@ -971,7 +984,7 @@ private[hive] class HiveClientImpl( partSpec, replace, numDP, - listBucketingEnabled = hiveTable.isStoredAsSubDirectories) + hiveTable) } override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index c03fed4cc3184..32d9ca8373510 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{IMetaStoreClient, PartitionDropOptions, TableType} @@ -32,7 +33,7 @@ import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, Funct import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.io.AcidUtils import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table} -import org.apache.hadoop.hive.ql.plan.AddPartitionDesc +import org.apache.hadoop.hive.ql.plan.{AddPartitionDesc, DynamicPartitionCtx} import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory} import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants @@ -42,15 +43,16 @@ import org.apache.spark.internal.LogKeys.{CONFIG, CONFIG2, CONFIG3} import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow} import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException -import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable, CatalogTablePartition, CatalogUtils, ExternalCatalogUtils, FunctionResource, FunctionResourceType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateFormatter, TypeUtils} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{AtomicType, DateType, IntegralType, IntegralTypeExpression, StringType} +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils /** * A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used @@ -161,8 +163,7 @@ private[client] sealed abstract class Shim { def createPartitions( hive: Hive, - dbName: String, - tableName: String, + table: Table, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit @@ -196,7 +197,7 @@ private[client] sealed abstract class Shim { partSpec: JMap[String, String], replace: Boolean, numDP: Int, - listBucketingEnabled: Boolean): Unit + hiveTable: Table): Unit def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit @@ -318,11 +319,10 @@ private[client] class Shim_v2_0 extends Shim with Logging { override def createPartitions( hive: Hive, - dbName: String, - tableName: String, + table: Table, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { - val addPartitionDesc = new AddPartitionDesc(dbName, tableName, ignoreIfExists) + val addPartitionDesc = new AddPartitionDesc(table.getDbName, table.getTableName, ignoreIfExists) parts.zipWithIndex.foreach { case (s, i) => addPartitionDesc.addPartition( s.spec.asJava, s.storage.locationUri.map(CatalogUtils.URIToString).orNull) @@ -504,8 +504,9 @@ private[client] class Shim_v2_0 extends Shim with Logging { partSpec: JMap[String, String], replace: Boolean, numDP: Int, - listBucketingEnabled: Boolean): Unit = { + hiveTable: Table): Unit = { recordHiveCall() + val listBucketingEnabled = hiveTable.isStoredAsSubDirectories loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions) } @@ -1096,8 +1097,9 @@ private[client] class Shim_v2_1 extends Shim_v2_0 { partSpec: JMap[String, String], replace: Boolean, numDP: Int, - listBucketingEnabled: Boolean): Unit = { + hiveTable: Table): Unit = { recordHiveCall() + val listBucketingEnabled = hiveTable.isStoredAsSubDirectories loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions, hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID) @@ -1234,7 +1236,8 @@ private[client] class Shim_v3_0 extends Shim_v2_3 { loadPartitionMethod.invoke(hive, loadPath, table, partSpec, loadFileType.get, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean, isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask, - writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace: JBoolean) + writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace: JBoolean + ) } override def loadTable( @@ -1262,7 +1265,7 @@ private[client] class Shim_v3_0 extends Shim_v2_3 { partSpec: JMap[String, String], replace: Boolean, numDP: Int, - listBucketingEnabled: Boolean): Unit = { + hiveTable: Table): Unit = { val loadFileType = if (replace) { clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) } else { @@ -1278,3 +1281,266 @@ private[client] class Shim_v3_0 extends Shim_v2_3 { } private[client] class Shim_v3_1 extends Shim_v3_0 + +private[client] class Shim_v4_0 extends Shim_v3_1 { + + private lazy val addPartitionsMethod = + findMethod( + classOf[Hive], + "addPartitions", + classOf[JList[Partition]], + JBoolean.TYPE, + JBoolean.TYPE) // needResults + + private lazy val alterTableMethod = + findMethod( + classOf[Hive], + "alterTable", + classOf[String], + classOf[Table], + classOf[EnvironmentContext], + JBoolean.TYPE) + + private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass( + "org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType") + + override def alterTable(hive: Hive, tableName: String, table: Table): Unit = { + recordHiveCall() + val transactional = false + alterTableMethod.invoke( + hive, + tableName, + table, + environmentContextInAlterTable, + transactional: JBoolean + ) + } + + private lazy val loadTableMethod = + findMethod( + classOf[Hive], + "loadTable", + classOf[Path], + classOf[String], + clazzLoadFileType, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + classOf[JLong], + JInteger.TYPE, + JBoolean.TYPE, + JBoolean.TYPE) + + private lazy val clazzLoadTableDesc = getClass.getClassLoader.loadClass( + "org.apache.hadoop.hive.ql.plan.LoadTableDesc") + + private lazy val clazzPartitionDetails = + Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities$PartitionDesc") + private lazy val alterPartitionsMethod = + findMethod( + classOf[Hive], + "alterPartitions", + classOf[String], + classOf[JList[Partition]], + classOf[EnvironmentContext], + JBoolean.TYPE) + private lazy val loadPartitionMethod = + findMethod( + classOf[Hive], + "loadPartition", + classOf[Path], + classOf[Table], + classOf[JMap[String, String]], + clazzLoadFileType, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + classOf[JLong], + JInteger.TYPE, + JBoolean.TYPE, + JBoolean.TYPE) + + private lazy val loadDynamicPartitionsMethod = + findMethod( + classOf[Hive], + "loadDynamicPartitions", + clazzLoadTableDesc, + JInteger.TYPE, // numLB number of buckets + JBoolean.TYPE, // isAcid true if this is an ACID operation + JLong.TYPE, // writeId, can be 0 unless isAcid == true + JInteger.TYPE, // stmtId + JBoolean.TYPE, // resetStatistics + classOf[AcidUtils.Operation], + classOf[JMap[Path, clazzPartitionDetails.type]]) + + private lazy val renamePartitionMethod = + findMethod( + classOf[Hive], + "renamePartition", + classOf[Table], + classOf[JMap[String, String]], + classOf[Partition], + JLong.TYPE) + + override def loadTable( + hive: Hive, + loadPath: Path, + tableName: String, + replace: Boolean, + isSrcLocal: Boolean): Unit = { + val loadFileType = if (replace) { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) + } else { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING")) + } + assert(loadFileType.isDefined) + recordHiveCall() + val resetStatistics = false + val isDirectInsert = false + loadTableMethod.invoke( + hive, + loadPath, + tableName, + loadFileType.get, + isSrcLocal: JBoolean, + isSkewedStoreAsSubdir, + isAcidIUDoperation, + resetStatistics, + writeIdInLoadTableOrPartition, + stmtIdInLoadTableOrPartition: JInteger, + replace: JBoolean, + isDirectInsert: JBoolean) + } + + override def alterPartitions( + hive: Hive, + tableName: String, + newParts: JList[Partition]): Unit = { + recordHiveCall() + val transactional: JBoolean = false + alterPartitionsMethod.invoke( + hive, + tableName, + newParts, + environmentContextInAlterTable, + transactional) + } + override def createPartitions( + hive: Hive, + table: Table, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = { + val partitions = parts.map(HiveClientImpl.toHivePartition(_, table).getTPartition).asJava + recordHiveCall() + val needResults = false + addPartitionsMethod.invoke(hive, partitions, ignoreIfExists, needResults: JBoolean) + } + + override def loadPartition( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean, + isSrcLocal: Boolean): Unit = { + recordHiveCall() + val table = hive.getTable(tableName) + val loadFileType = if (replace) { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) + } else { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING")) + } + assert(loadFileType.isDefined) + val inheritLocation: JBoolean = false + val isDirectInsert: JBoolean = false + recordHiveCall() + loadPartitionMethod.invoke( + hive, + loadPath, + table, + partSpec, + loadFileType.get, + inheritTableSpecs: JBoolean, + inheritLocation, + isSkewedStoreAsSubdir: JBoolean, + isSrcLocal: JBoolean, + isAcid, + hasFollowingStatsTask, + writeIdInLoadTableOrPartition, + stmtIdInLoadTableOrPartition, + replace: JBoolean, + isDirectInsert + ) + } + + override def loadDynamicPartitions( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + numDP: Int, + hiveTable: Table): Unit = { + import org.apache.hadoop.hive.ql.exec.Utilities + val loadFileType = if (replace) { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) + } else { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING")) + } + assert(loadFileType.isDefined) + + val useAppendForLoad: JBoolean = false + val loadTableDesc = clazzLoadTableDesc + .getConstructor( + classOf[Path], + classOf[Table], + JBoolean.TYPE, + JBoolean.TYPE, + classOf[JMap[String, String]]) + .newInstance( + loadPath, + hiveTable, + replace: JBoolean, + useAppendForLoad, + partSpec) + val ctx = new DynamicPartitionCtx() + ctx.setRootPath(loadPath) + ctx.setNumDPCols(numDP) + ctx.setPartSpec(partSpec) + + val fullDPSpecs = classOf[Utilities].getMethod( + "getFullDPSpecs", + classOf[Configuration], + classOf[DynamicPartitionCtx], + classOf[java.util.Map[String, java.util.List[Path]]]) + fullDPSpecs.setAccessible(true) + recordHiveCall() + val resetPartitionStats: JBoolean = false + loadDynamicPartitionsMethod.invoke( + hive, + loadTableDesc, + listBucketingLevel, + isAcid, + writeIdInLoadTableOrPartition, + stmtIdInLoadTableOrPartition, + resetPartitionStats, + AcidUtils.Operation.NOT_ACID, + fullDPSpecs.invoke(null, hive.getConf, ctx, null) + ) + } + + override def renamePartition( + hive: Hive, + table: Table, + oldPartSpec: JMap[String, String], + newPart: Partition): Unit = { + recordHiveCall() + renamePartitionMethod.invoke(hive, table, oldPartSpec, newPart, writeIdInLoadTableOrPartition) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index b0570f5d30352..5d6f928d53dad 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -98,6 +98,7 @@ private[hive] object IsolatedClientLoader extends Logging { case (2, 3, _) => Some(hive.v2_3) case (3, 0, _) => Some(hive.v3_0) case (3, 1, _) => Some(hive.v3_1) + case (4, 0, _) => Some(hive.v4_0) case _ => None }.getOrElse { throw QueryExecutionErrors.unsupportedHiveMetastoreVersionError( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala index d172af21a9170..6a9815342e73a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -87,8 +87,22 @@ package object client { "org.pentaho:pentaho-aggdesigner-algorithm", "org.apache.hive:hive-vector-code-gen")) + case object v4_0 extends HiveVersion("4.0.1", + extraDeps = Seq("org.apache.hadoop:hadoop-hdfs:3.3.6", + "org.datanucleus:datanucleus-api-jdo:5.2.8", + "org.datanucleus:datanucleus-rdbms:5.2.10", + "org.datanucleus:javax.jdo:3.2.0-release", + "org.springframework:spring-core:5.3.21", + "org.springframework:spring-jdbc:5.3.21", + "org.antlr:antlr4-runtime:4.9.3", + "org.apache.derby:derby:10.14.2.0"), + exclusions = Seq("org.apache.calcite:calcite-druid", + "org.apache.curator:*", + "org.pentaho:pentaho-aggdesigner-algorithm", + "org.apache.hive:hive-vector-code-gen")) + val allSupportedHiveVersions: Set[HiveVersion] = - Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) + Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1, v4_0) } // scalastyle:on diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index 07f212d2dcabb..5c65eb8b12bac 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -163,7 +163,7 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) { // test alter database location val tempDatabasePath2 = Utils.createTempDir().toURI // Hive support altering database location since HIVE-8472. - if (version == "3.0" || version == "3.1") { + if (version == "3.0" || version == "3.1" || version == "4.0") { client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) val uriInCatalog = client.getDatabase("temporary").locationUri assert("file" === uriInCatalog.getScheme) @@ -376,7 +376,7 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) { CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat) } client.createPartitions( - "default", "src_part", partitions, ignoreIfExists = true) + client.getTable("default", "src_part"), partitions, ignoreIfExists = true) } test("getPartitionNames(catalogTable)") { @@ -479,10 +479,12 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) { val partitions = Seq(CatalogTablePartition( Map("key1" -> "101", "key2" -> "102"), storageFormat)) + val table = client.getTable("default", "src_part") + try { - client.createPartitions("default", "src_part", partitions, ignoreIfExists = false) + client.createPartitions(table, partitions, ignoreIfExists = false) val e = intercept[PartitionsAlreadyExistException] { - client.createPartitions("default", "src_part", partitions, ignoreIfExists = false) + client.createPartitions(table, partitions, ignoreIfExists = false) } checkError(e, condition = "PARTITIONS_ALREADY_EXIST", @@ -558,7 +560,7 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) { test("sql create index and reset") { // HIVE-18448 Since Hive 3.0, INDEX is not supported. - if (version != "3.0" && version != "3.1") { + if (version != "3.0" && version != "3.1" && version != "4.0") { client.runSqlHive("CREATE TABLE indexed_table (key INT)") client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " + "as 'COMPACT' WITH DEFERRED REBUILD") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala index 0bc6702079bdb..f54760e44b969 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala @@ -22,6 +22,6 @@ private[client] trait HiveClientVersions { protected val versions = if (testVersions.nonEmpty) { testVersions.get.split(",").map(_.trim).filter(_.nonEmpty).toIndexedSeq } else { - IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1") + IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1", "4.0") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala index 1a4eb75547894..63be4dc363f1a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala @@ -111,8 +111,7 @@ class HivePartitionFilteringSuite(version: String) ), storageFormat) assert(partitions.size == testPartitionCount) - client.createPartitions( - "default", "test", partitions, ignoreIfExists = false) + client.createPartitions(table, partitions, ignoreIfExists = false) client } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala index 5db0b4f18c962..1a45f6b150969 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala @@ -31,14 +31,11 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and // hive.metastore.schema.verification from false to true since 2.0 // For details, see the JIRA HIVE-6113 and HIVE-12463 - if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" || - version == "3.0" || version == "3.1") { - hadoopConf.set("datanucleus.schema.autoCreateAll", "true") - hadoopConf.set("datanucleus.autoStartMechanismMode", "ignored") - hadoopConf.set("hive.metastore.schema.verification", "false") - } + hadoopConf.set("datanucleus.schema.autoCreateAll", "true") + hadoopConf.set("datanucleus.autoStartMechanismMode", "ignored") + hadoopConf.set("hive.metastore.schema.verification", "false") // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. - if (version == "3.0" || version == "3.1") { + if (version == "3.0" || version == "3.1" || version == "4.0") { hadoopConf.set("hive.in.test", "true") hadoopConf.set("hive.query.reexecution.enabled", "false") } From 88c4ea9b0598b4ffbef326e3fb475770ab2d8298 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 11 Nov 2024 17:47:40 +0800 Subject: [PATCH 2/9] [SPARK-45265][SQL] Support Hive Metastore Server 4.0 --- docs/sql-data-sources-hive-tables.md | 2 +- .../spark/sql/hive/client/HiveShim.scala | 63 ++++++++----------- 2 files changed, 28 insertions(+), 37 deletions(-) diff --git a/docs/sql-data-sources-hive-tables.md b/docs/sql-data-sources-hive-tables.md index 566dcb33a25d9..d45174425f470 100644 --- a/docs/sql-data-sources-hive-tables.md +++ b/docs/sql-data-sources-hive-tables.md @@ -130,7 +130,7 @@ The following options can be used to configure the version of Hive that is used 2.3.10 Version of the Hive metastore. Available - options are 2.0.0 through 2.3.10 and 3.0.0 through 3.1.3. + options are 2.0.0 through 2.3.10, 3.0.0 through 3.1.3, and 4.0.0 through 4.0.1. 1.4.0 diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 32d9ca8373510..dcfece19130b0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -1236,8 +1236,7 @@ private[client] class Shim_v3_0 extends Shim_v2_3 { loadPartitionMethod.invoke(hive, loadPath, table, partSpec, loadFileType.get, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean, isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask, - writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace: JBoolean - ) + writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace: JBoolean) } override def loadTable( @@ -1283,15 +1282,12 @@ private[client] class Shim_v3_0 extends Shim_v2_3 { private[client] class Shim_v3_1 extends Shim_v3_0 private[client] class Shim_v4_0 extends Shim_v3_1 { - - private lazy val addPartitionsMethod = - findMethod( - classOf[Hive], - "addPartitions", - classOf[JList[Partition]], - JBoolean.TYPE, - JBoolean.TYPE) // needResults - + private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass( + "org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType") + private lazy val clazzLoadTableDesc = getClass.getClassLoader.loadClass( + "org.apache.hadoop.hive.ql.plan.LoadTableDesc") + private lazy val clazzPartitionDetails = + Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities$PartitionDesc") private lazy val alterTableMethod = findMethod( classOf[Hive], @@ -1300,22 +1296,6 @@ private[client] class Shim_v4_0 extends Shim_v3_1 { classOf[Table], classOf[EnvironmentContext], JBoolean.TYPE) - - private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass( - "org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType") - - override def alterTable(hive: Hive, tableName: String, table: Table): Unit = { - recordHiveCall() - val transactional = false - alterTableMethod.invoke( - hive, - tableName, - table, - environmentContextInAlterTable, - transactional: JBoolean - ) - } - private lazy val loadTableMethod = findMethod( classOf[Hive], @@ -1331,12 +1311,13 @@ private[client] class Shim_v4_0 extends Shim_v3_1 { JInteger.TYPE, JBoolean.TYPE, JBoolean.TYPE) - - private lazy val clazzLoadTableDesc = getClass.getClassLoader.loadClass( - "org.apache.hadoop.hive.ql.plan.LoadTableDesc") - - private lazy val clazzPartitionDetails = - Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities$PartitionDesc") + private lazy val addPartitionsMethod = + findMethod( + classOf[Hive], + "addPartitions", + classOf[JList[Partition]], + JBoolean.TYPE, + JBoolean.TYPE) // needResults private lazy val alterPartitionsMethod = findMethod( classOf[Hive], @@ -1363,7 +1344,6 @@ private[client] class Shim_v4_0 extends Shim_v3_1 { JInteger.TYPE, JBoolean.TYPE, JBoolean.TYPE) - private lazy val loadDynamicPartitionsMethod = findMethod( classOf[Hive], @@ -1376,7 +1356,6 @@ private[client] class Shim_v4_0 extends Shim_v3_1 { JBoolean.TYPE, // resetStatistics classOf[AcidUtils.Operation], classOf[JMap[Path, clazzPartitionDetails.type]]) - private lazy val renamePartitionMethod = findMethod( classOf[Hive], @@ -1386,6 +1365,18 @@ private[client] class Shim_v4_0 extends Shim_v3_1 { classOf[Partition], JLong.TYPE) + override def alterTable(hive: Hive, tableName: String, table: Table): Unit = { + recordHiveCall() + val transactional = false + alterTableMethod.invoke( + hive, + tableName, + table, + environmentContextInAlterTable, + transactional: JBoolean + ) + } + override def loadTable( hive: Hive, loadPath: Path, @@ -1429,6 +1420,7 @@ private[client] class Shim_v4_0 extends Shim_v3_1 { environmentContextInAlterTable, transactional) } + override def createPartitions( hive: Hive, table: Table, @@ -1519,7 +1511,6 @@ private[client] class Shim_v4_0 extends Shim_v3_1 { classOf[Configuration], classOf[DynamicPartitionCtx], classOf[java.util.Map[String, java.util.List[Path]]]) - fullDPSpecs.setAccessible(true) recordHiveCall() val resetPartitionStats: JBoolean = false loadDynamicPartitionsMethod.invoke( From 5b009269593e6fdecc4285a2adf5dc44464b79f0 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 12 Nov 2024 10:41:41 +0800 Subject: [PATCH 3/9] fix CliSuites --- .../hive/thriftserver/SparkSQLCLIDriver.scala | 39 ++++++++++--------- .../hive/thriftserver/SparkSQLDriver.scala | 2 +- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index faab14bb9e365..083d9c4a0d436 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -458,27 +458,28 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { if (sessionState.getIsVerbose) { out.println(cmd) } - val rc = driver.run(cmd) - val endTimeNs = System.nanoTime() - val timeTaken: Double = TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs) / 1000.0 - - ret = rc.getResponseCode - if (ret != 0) { - val format = SparkSQLEnv.sparkSession.sessionState.conf.errorMessageFormat - val e = rc.getException - val msg = e match { - case st: SparkThrowable with Throwable => SparkThrowableHelper.getMessage(st, format) - case _ => e.getMessage - } - err.println(msg) - if (format == ErrorMessageFormat.PRETTY && + try { + driver.run(cmd) + } catch { + case t: Throwable => + ret = 1 + val format = SparkSQLEnv.sparkSession.sessionState.conf.errorMessageFormat + val msg = t match { + case st: SparkThrowable with Throwable => + SparkThrowableHelper.getMessage(st, format) + case _ => t.getMessage + } + err.println(msg) + if (format == ErrorMessageFormat.PRETTY && !sessionState.getIsSilent && - (!e.isInstanceOf[AnalysisException] || e.getCause != null)) { - e.printStackTrace(err) - } - driver.close() - return ret + (!t.isInstanceOf[AnalysisException] || t.getCause != null)) { + t.printStackTrace(err) + } + driver.close() + return ret } + val endTimeNs = System.nanoTime() + val timeTaken: Double = TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs) / 1000.0 val res = new JArrayList[String]() diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 08f5d63bba1a6..0801bffed8e52 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -82,7 +82,7 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv. } catch { case st: SparkThrowable => logDebug(s"Failed in [$command]", st) - throw new QueryExecutionException(ExceptionUtils.getStackTrace(st)) + throw st case cause: Throwable => logError(log"Failed in [${MDC(COMMAND, command)}]", cause) throw new QueryExecutionException(ExceptionUtils.getStackTrace(cause)) From f368ff1460d9445ba00853c79d7d95605eff591a Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 12 Nov 2024 15:19:58 +0800 Subject: [PATCH 4/9] response code --- .../sql/hive/client/HiveClientImpl.scala | 55 ++++++++++++++----- 1 file changed, 42 insertions(+), 13 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 3adf331366149..ba03b7fe3cee1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -28,6 +28,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.StatsSetupConst @@ -44,7 +45,7 @@ import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SparkConf, SparkException, SparkThrowable} import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ @@ -877,11 +878,22 @@ private[hive] class HiveClientImpl( // Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed // and the CommandProcessorFactory.clean function removed. driver.getClass.getMethod("close").invoke(driver) - if (version != hive.v3_0 && version != hive.v3_1) { + if (version != hive.v3_0 && version != hive.v3_1 && version != hive.v4_0) { CommandProcessorFactory.clean(conf) } } + def getResponseCode(response: CommandProcessorResponse): Int = { + if (version < hive.v4_0) { + response.getResponseCode + } else { + // Since Hive 4.0, response code is removed from CommandProcessorResponse. + // Here we simply return 0 for the positive cases as for error cases it will + // throw exceptions early. + 0 + } + } + // Hive query needs to start SessionState. SessionState.start(state) logDebug(s"Running hiveql '$cmd'") @@ -894,17 +906,27 @@ private[hive] class HiveClientImpl( val proc = shim.getCommandProcessor(tokens(0), conf) proc match { case driver: Driver => - val response: CommandProcessorResponse = driver.run(cmd) - // Throw an exception if there is an error in query processing. - if (response.getResponseCode != 0) { + try { + val response: CommandProcessorResponse = driver.run(cmd) + if (getResponseCode(response) != 0) { + // Throw an exception if there is an error in query processing. + // This works for hive 3.x and earlier versions. + throw new QueryExecutionException(response.getErrorMessage) + } + driver.setMaxRows(maxRows) + val results = shim.getDriverResults(driver) + results + } catch { + case e @ (_: QueryExecutionException | _: SparkThrowable) => + throw e + case e: Exception => + // Wrap the original hive error with QueryExecutionException and throw it + // if there is an error in query processing. + // This works for hive 4.x and later versions. + throw new QueryExecutionException(ExceptionUtils.getStackTrace(e)) + } finally { closeDriver(driver) - throw new QueryExecutionException(response.getErrorMessage) } - driver.setMaxRows(maxRows) - - val results = shim.getDriverResults(driver) - closeDriver(driver) - results case _ => val out = state.getClass.getField("out").get(state) @@ -913,8 +935,15 @@ private[hive] class HiveClientImpl( out.asInstanceOf[PrintStream].println(tokens(0) + " " + cmd_1) // scalastyle:on println } - proc.run(cmd_1) - Seq("0") + val response: CommandProcessorResponse = proc.run(cmd_1) + val responseCode = getResponseCode(response) + if (responseCode != 0) { + // Throw an exception if there is an error in query processing. + // This works for hive 3.x and earlier versions. For 4.x and later versions, + // It will go to the catch block directly. + throw new QueryExecutionException(response.getErrorMessage) + } + Seq(responseCode.toString) } } catch { case e: Exception => From e71bd5fa397520d86ecc784bf7794181fd5c4272 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 12 Nov 2024 23:22:36 +0800 Subject: [PATCH 5/9] timeout 240 --- .github/workflows/build_and_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index e51c896cc3f99..41a73c6f2981c 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -141,7 +141,7 @@ jobs: needs: precondition if: fromJson(needs.precondition.outputs.required).build == 'true' runs-on: ubuntu-latest - timeout-minutes: 180 + timeout-minutes: 240 strategy: fail-fast: false matrix: From e952423275135fd301f80e055593a5525ff06f76 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 13 Nov 2024 18:53:49 +0800 Subject: [PATCH 6/9] Revert "timeout 240" This reverts commit e71bd5fa397520d86ecc784bf7794181fd5c4272. --- .github/workflows/build_and_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index e13eb5056f98d..696bbf9cfe41c 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -161,7 +161,7 @@ jobs: needs: precondition if: fromJson(needs.precondition.outputs.required).build == 'true' runs-on: ubuntu-latest - timeout-minutes: 240 + timeout-minutes: 180 strategy: fail-fast: false matrix: From 24b4f584289b5c3e0ac05581a1b135640460ede0 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 13 Nov 2024 18:54:15 +0800 Subject: [PATCH 7/9] getPartitionsByFilter LIKE wildcard --- .../org/apache/spark/sql/hive/client/HiveShim.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index dcfece19130b0..b17b68ad99592 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -259,6 +259,8 @@ private[client] class Shim_v2_0 extends Shim with Logging { // txnId can be 0 unless isAcid == true protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L + protected lazy val wildcard: String = ".*" + override def getMSC(hive: Hive): IMetaStoreClient = hive.getMSC private lazy val loadPartitionMethod = @@ -820,13 +822,13 @@ private[client] class Shim_v2_0 extends Shim with Logging { Some(s"$value ${op.symbol} $name") case Contains(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) => - Some(s"$name like " + (("\".*" + value.drop(1)).dropRight(1) + ".*\"")) + Some(s"$name like " + (("\"" + wildcard + value.drop(1)).dropRight(1) + wildcard + "\"")) case StartsWith(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) => - Some(s"$name like " + (value.dropRight(1) + ".*\"")) + Some(s"$name like " + (value.dropRight(1) + wildcard + "\"")) case EndsWith(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) => - Some(s"$name like " + ("\".*" + value.drop(1))) + Some(s"$name like " + ("\"" + wildcard + value.drop(1))) case And(expr1, expr2) if useAdvanced => val converted = convert(expr1) ++ convert(expr2) @@ -1288,6 +1290,9 @@ private[client] class Shim_v4_0 extends Shim_v3_1 { "org.apache.hadoop.hive.ql.plan.LoadTableDesc") private lazy val clazzPartitionDetails = Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities$PartitionDesc") + + override protected lazy val wildcard: String = "%" + private lazy val alterTableMethod = findMethod( classOf[Hive], From 3552e06ccdf651e9b0ad8ef3efdf2302d0c7f41b Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 14 Nov 2024 11:03:13 +0800 Subject: [PATCH 8/9] derby stackoverflow --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 0c1b1afbdc1c4..b2880efb6527c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1664,7 +1664,7 @@ object TestSettings { "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", "-Djdk.reflect.useDirectMethodHandle=false", "-Dio.netty.tryReflectionSetAccessible=true").mkString(" ") - s"-Xmx$heapSize -Xss4m -XX:MaxMetaspaceSize=$metaspaceSize -XX:ReservedCodeCacheSize=128m -Dfile.encoding=UTF-8 $extraTestJavaArgs" + s"-Xmx$heapSize -Xss64m -XX:MaxMetaspaceSize=$metaspaceSize -XX:ReservedCodeCacheSize=128m -Dfile.encoding=UTF-8 $extraTestJavaArgs" .split(" ").toSeq }, javaOptions ++= { From f0c36b7a34c52c79eff3d33db5fbc97ae291efe5 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 14 Nov 2024 14:07:31 +0800 Subject: [PATCH 9/9] derby stackoverflow --- project/SparkBuild.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b2880efb6527c..5092b9551af1b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1183,6 +1183,14 @@ object Hive { // Hive tests need higher metaspace size (Test / javaOptions) := (Test / javaOptions).value.filterNot(_.contains("MaxMetaspaceSize")), (Test / javaOptions) += "-XX:MaxMetaspaceSize=2g", + // SPARK-45265: HivePartitionFilteringSuite addPartitions related tests generate supper long + // direct sql against derby server, which may cause stack overflow error when derby do sql + // parsing. + // We need to increase the Xss for the test. Meanwhile, QueryParsingErrorsSuite requires a + // smaller size of Xss to mock a FAILED_TO_PARSE_TOO_COMPLEX error, so we need to set for + // hive moudle specifically. + (Test / javaOptions) := (Test / javaOptions).value.filterNot(_.contains("Xss")), + (Test / javaOptions) += "-Xss64m", // Supporting all SerDes requires us to depend on deprecated APIs, so we turn off the warnings // only for this subproject. scalacOptions := (scalacOptions map { currentOpts: Seq[String] => @@ -1664,7 +1672,7 @@ object TestSettings { "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", "-Djdk.reflect.useDirectMethodHandle=false", "-Dio.netty.tryReflectionSetAccessible=true").mkString(" ") - s"-Xmx$heapSize -Xss64m -XX:MaxMetaspaceSize=$metaspaceSize -XX:ReservedCodeCacheSize=128m -Dfile.encoding=UTF-8 $extraTestJavaArgs" + s"-Xmx$heapSize -Xss4m -XX:MaxMetaspaceSize=$metaspaceSize -XX:ReservedCodeCacheSize=128m -Dfile.encoding=UTF-8 $extraTestJavaArgs" .split(" ").toSeq }, javaOptions ++= {