diff --git a/docs/sql-data-sources-hive-tables.md b/docs/sql-data-sources-hive-tables.md index 566dcb33a25d9..d45174425f470 100644 --- a/docs/sql-data-sources-hive-tables.md +++ b/docs/sql-data-sources-hive-tables.md @@ -130,7 +130,7 @@ The following options can be used to configure the version of Hive that is used
2.3.10
2.0.0
through 2.3.10
and 3.0.0
through 3.1.3
.
+ options are 2.0.0
through 2.3.10
, 3.0.0
through 3.1.3
, and 4.0.0
through 4.0.1
.
2.0.0
through 2.3.10
and " +
- "3.0.0
through 3.1.3
.")
+ "2.0.0
through 2.3.10
, " +
+ "3.0.0
through 3.1.3
and " +
+ "4.0.0
through 4.0.1
.")
.version("1.4.0")
.stringConf
.checkValue(isCompatibleHiveVersion, "Unsupported Hive Metastore version")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index ff225e7a50f0d..3d89f31e1965b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -164,8 +164,7 @@ private[hive] trait HiveClient {
* Create one or many partitions in the given table.
*/
def createPartitions(
- db: String,
- table: String,
+ table: CatalogTable,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 735814c9ae084..ba03b7fe3cee1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.client
-import java.io.PrintStream
+import java.io.{OutputStream, PrintStream}
import java.lang.{Iterable => JIterable}
import java.lang.reflect.InvocationTargetException
import java.nio.charset.StandardCharsets.UTF_8
@@ -28,6 +28,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
+import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
@@ -44,7 +45,7 @@ import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.{SparkConf, SparkException, SparkThrowable}
import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
@@ -121,6 +122,7 @@ private[hive] class HiveClientImpl(
case hive.v2_3 => new Shim_v2_3()
case hive.v3_0 => new Shim_v3_0()
case hive.v3_1 => new Shim_v3_1()
+ case hive.v4_0 => new Shim_v4_0()
}
// Create an internal session state for this HiveClientImpl.
@@ -177,8 +179,10 @@ private[hive] class HiveClientImpl(
// got changed. We reset it to clientLoader.ClassLoader here.
state.getConf.setClassLoader(clientLoader.classLoader)
shim.setCurrentSessionState(state)
- state.out = new PrintStream(outputBuffer, true, UTF_8.name())
- state.err = new PrintStream(outputBuffer, true, UTF_8.name())
+ val clz = state.getClass.getField("out").getType.asInstanceOf[Class[_ <: PrintStream]]
+ val ctor = clz.getConstructor(classOf[OutputStream], classOf[Boolean], classOf[String])
+ state.getClass.getField("out").set(state, ctor.newInstance(outputBuffer, true, UTF_8.name()))
+ state.getClass.getField("err").set(state, ctor.newInstance(outputBuffer, true, UTF_8.name()))
state
}
@@ -307,15 +311,27 @@ private[hive] class HiveClientImpl(
}
def setOut(stream: PrintStream): Unit = withHiveState {
- state.out = stream
+ val ctor = state.getClass.getField("out")
+ .getType
+ .asInstanceOf[Class[_ <: PrintStream]]
+ .getConstructor(classOf[OutputStream])
+ state.getClass.getField("out").set(state, ctor.newInstance(stream))
}
def setInfo(stream: PrintStream): Unit = withHiveState {
- state.info = stream
+ val ctor = state.getClass.getField("info")
+ .getType
+ .asInstanceOf[Class[_ <: PrintStream]]
+ .getConstructor(classOf[OutputStream])
+ state.getClass.getField("info").set(state, ctor.newInstance(stream))
}
def setError(stream: PrintStream): Unit = withHiveState {
- state.err = stream
+ val ctor = state.getClass.getField("err")
+ .getType
+ .asInstanceOf[Class[_ <: PrintStream]]
+ .getConstructor(classOf[OutputStream])
+ state.getClass.getField("err").set(state, ctor.newInstance(stream))
}
private def setCurrentDatabaseRaw(db: String): Unit = {
@@ -629,21 +645,22 @@ private[hive] class HiveClientImpl(
}
override def createPartitions(
- db: String,
- table: String,
+ table: CatalogTable,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = withHiveState {
def replaceExistException(e: Throwable): Unit = e match {
case _: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] =>
- val hiveTable = client.getTable(db, table)
+ val db = table.identifier.database.getOrElse(state.getCurrentDatabase)
+ val tableName = table.identifier.table
+ val hiveTable = client.getTable(db, tableName)
val existingParts = parts.filter { p =>
shim.getPartitions(client, hiveTable, p.spec.asJava).nonEmpty
}
- throw new PartitionsAlreadyExistException(db, table, existingParts.map(_.spec))
+ throw new PartitionsAlreadyExistException(db, tableName, existingParts.map(_.spec))
case _ => throw e
}
try {
- shim.createPartitions(client, db, table, parts, ignoreIfExists)
+ shim.createPartitions(client, toHiveTable(table), parts, ignoreIfExists)
} catch {
case e: InvocationTargetException => replaceExistException(e.getCause)
case e: Throwable => replaceExistException(e)
@@ -861,11 +878,22 @@ private[hive] class HiveClientImpl(
// Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed
// and the CommandProcessorFactory.clean function removed.
driver.getClass.getMethod("close").invoke(driver)
- if (version != hive.v3_0 && version != hive.v3_1) {
+ if (version != hive.v3_0 && version != hive.v3_1 && version != hive.v4_0) {
CommandProcessorFactory.clean(conf)
}
}
+ def getResponseCode(response: CommandProcessorResponse): Int = {
+ if (version < hive.v4_0) {
+ response.getResponseCode
+ } else {
+ // Since Hive 4.0, response code is removed from CommandProcessorResponse.
+ // Here we simply return 0 for the positive cases as for error cases it will
+ // throw exceptions early.
+ 0
+ }
+ }
+
// Hive query needs to start SessionState.
SessionState.start(state)
logDebug(s"Running hiveql '$cmd'")
@@ -878,30 +906,44 @@ private[hive] class HiveClientImpl(
val proc = shim.getCommandProcessor(tokens(0), conf)
proc match {
case driver: Driver =>
- val response: CommandProcessorResponse = driver.run(cmd)
- // Throw an exception if there is an error in query processing.
- if (response.getResponseCode != 0) {
+ try {
+ val response: CommandProcessorResponse = driver.run(cmd)
+ if (getResponseCode(response) != 0) {
+ // Throw an exception if there is an error in query processing.
+ // This works for hive 3.x and earlier versions.
+ throw new QueryExecutionException(response.getErrorMessage)
+ }
+ driver.setMaxRows(maxRows)
+ val results = shim.getDriverResults(driver)
+ results
+ } catch {
+ case e @ (_: QueryExecutionException | _: SparkThrowable) =>
+ throw e
+ case e: Exception =>
+ // Wrap the original hive error with QueryExecutionException and throw it
+ // if there is an error in query processing.
+ // This works for hive 4.x and later versions.
+ throw new QueryExecutionException(ExceptionUtils.getStackTrace(e))
+ } finally {
closeDriver(driver)
- throw new QueryExecutionException(response.getErrorMessage)
}
- driver.setMaxRows(maxRows)
-
- val results = shim.getDriverResults(driver)
- closeDriver(driver)
- results
case _ =>
- if (state.out != null) {
+ val out = state.getClass.getField("out").get(state)
+ if (out != null) {
// scalastyle:off println
- state.out.println(tokens(0) + " " + cmd_1)
+ out.asInstanceOf[PrintStream].println(tokens(0) + " " + cmd_1)
// scalastyle:on println
}
val response: CommandProcessorResponse = proc.run(cmd_1)
- // Throw an exception if there is an error in query processing.
- if (response.getResponseCode != 0) {
+ val responseCode = getResponseCode(response)
+ if (responseCode != 0) {
+ // Throw an exception if there is an error in query processing.
+ // This works for hive 3.x and earlier versions. For 4.x and later versions,
+ // It will go to the catch block directly.
throw new QueryExecutionException(response.getErrorMessage)
}
- Seq(response.getResponseCode.toString)
+ Seq(responseCode.toString)
}
} catch {
case e: Exception =>
@@ -971,7 +1013,7 @@ private[hive] class HiveClientImpl(
partSpec,
replace,
numDP,
- listBucketingEnabled = hiveTable.isStoredAsSubDirectories)
+ hiveTable)
}
override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index c03fed4cc3184..b17b68ad99592 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{IMetaStoreClient, PartitionDropOptions, TableType}
@@ -32,7 +33,7 @@ import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, Funct
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.io.AcidUtils
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
-import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
+import org.apache.hadoop.hive.ql.plan.{AddPartitionDesc, DynamicPartitionCtx}
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
@@ -42,15 +43,16 @@ import org.apache.spark.internal.LogKeys.{CONFIG, CONFIG2, CONFIG3}
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
-import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable, CatalogTablePartition, CatalogUtils, ExternalCatalogUtils, FunctionResource, FunctionResourceType}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateFormatter, TypeUtils}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{AtomicType, DateType, IntegralType, IntegralTypeExpression, StringType}
+import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
/**
* A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used
@@ -161,8 +163,7 @@ private[client] sealed abstract class Shim {
def createPartitions(
hive: Hive,
- dbName: String,
- tableName: String,
+ table: Table,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit
@@ -196,7 +197,7 @@ private[client] sealed abstract class Shim {
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
- listBucketingEnabled: Boolean): Unit
+ hiveTable: Table): Unit
def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit
@@ -258,6 +259,8 @@ private[client] class Shim_v2_0 extends Shim with Logging {
// txnId can be 0 unless isAcid == true
protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L
+ protected lazy val wildcard: String = ".*"
+
override def getMSC(hive: Hive): IMetaStoreClient = hive.getMSC
private lazy val loadPartitionMethod =
@@ -318,11 +321,10 @@ private[client] class Shim_v2_0 extends Shim with Logging {
override def createPartitions(
hive: Hive,
- dbName: String,
- tableName: String,
+ table: Table,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
- val addPartitionDesc = new AddPartitionDesc(dbName, tableName, ignoreIfExists)
+ val addPartitionDesc = new AddPartitionDesc(table.getDbName, table.getTableName, ignoreIfExists)
parts.zipWithIndex.foreach { case (s, i) =>
addPartitionDesc.addPartition(
s.spec.asJava, s.storage.locationUri.map(CatalogUtils.URIToString).orNull)
@@ -504,8 +506,9 @@ private[client] class Shim_v2_0 extends Shim with Logging {
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
- listBucketingEnabled: Boolean): Unit = {
+ hiveTable: Table): Unit = {
recordHiveCall()
+ val listBucketingEnabled = hiveTable.isStoredAsSubDirectories
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions)
}
@@ -819,13 +822,13 @@ private[client] class Shim_v2_0 extends Shim with Logging {
Some(s"$value ${op.symbol} $name")
case Contains(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
- Some(s"$name like " + (("\".*" + value.drop(1)).dropRight(1) + ".*\""))
+ Some(s"$name like " + (("\"" + wildcard + value.drop(1)).dropRight(1) + wildcard + "\""))
case StartsWith(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
- Some(s"$name like " + (value.dropRight(1) + ".*\""))
+ Some(s"$name like " + (value.dropRight(1) + wildcard + "\""))
case EndsWith(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
- Some(s"$name like " + ("\".*" + value.drop(1)))
+ Some(s"$name like " + ("\"" + wildcard + value.drop(1)))
case And(expr1, expr2) if useAdvanced =>
val converted = convert(expr1) ++ convert(expr2)
@@ -1096,8 +1099,9 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
- listBucketingEnabled: Boolean): Unit = {
+ hiveTable: Table): Unit = {
recordHiveCall()
+ val listBucketingEnabled = hiveTable.isStoredAsSubDirectories
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions,
hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID)
@@ -1262,7 +1266,7 @@ private[client] class Shim_v3_0 extends Shim_v2_3 {
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
- listBucketingEnabled: Boolean): Unit = {
+ hiveTable: Table): Unit = {
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
@@ -1278,3 +1282,261 @@ private[client] class Shim_v3_0 extends Shim_v2_3 {
}
private[client] class Shim_v3_1 extends Shim_v3_0
+
+private[client] class Shim_v4_0 extends Shim_v3_1 {
+ private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass(
+ "org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType")
+ private lazy val clazzLoadTableDesc = getClass.getClassLoader.loadClass(
+ "org.apache.hadoop.hive.ql.plan.LoadTableDesc")
+ private lazy val clazzPartitionDetails =
+ Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities$PartitionDesc")
+
+ override protected lazy val wildcard: String = "%"
+
+ private lazy val alterTableMethod =
+ findMethod(
+ classOf[Hive],
+ "alterTable",
+ classOf[String],
+ classOf[Table],
+ classOf[EnvironmentContext],
+ JBoolean.TYPE)
+ private lazy val loadTableMethod =
+ findMethod(
+ classOf[Hive],
+ "loadTable",
+ classOf[Path],
+ classOf[String],
+ clazzLoadFileType,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ classOf[JLong],
+ JInteger.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+ private lazy val addPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "addPartitions",
+ classOf[JList[Partition]],
+ JBoolean.TYPE,
+ JBoolean.TYPE) // needResults
+ private lazy val alterPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "alterPartitions",
+ classOf[String],
+ classOf[JList[Partition]],
+ classOf[EnvironmentContext],
+ JBoolean.TYPE)
+ private lazy val loadPartitionMethod =
+ findMethod(
+ classOf[Hive],
+ "loadPartition",
+ classOf[Path],
+ classOf[Table],
+ classOf[JMap[String, String]],
+ clazzLoadFileType,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ classOf[JLong],
+ JInteger.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadDynamicPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "loadDynamicPartitions",
+ clazzLoadTableDesc,
+ JInteger.TYPE, // numLB number of buckets
+ JBoolean.TYPE, // isAcid true if this is an ACID operation
+ JLong.TYPE, // writeId, can be 0 unless isAcid == true
+ JInteger.TYPE, // stmtId
+ JBoolean.TYPE, // resetStatistics
+ classOf[AcidUtils.Operation],
+ classOf[JMap[Path, clazzPartitionDetails.type]])
+ private lazy val renamePartitionMethod =
+ findMethod(
+ classOf[Hive],
+ "renamePartition",
+ classOf[Table],
+ classOf[JMap[String, String]],
+ classOf[Partition],
+ JLong.TYPE)
+
+ override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
+ recordHiveCall()
+ val transactional = false
+ alterTableMethod.invoke(
+ hive,
+ tableName,
+ table,
+ environmentContextInAlterTable,
+ transactional: JBoolean
+ )
+ }
+
+ override def loadTable(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ replace: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ val loadFileType = if (replace) {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
+ } else {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
+ }
+ assert(loadFileType.isDefined)
+ recordHiveCall()
+ val resetStatistics = false
+ val isDirectInsert = false
+ loadTableMethod.invoke(
+ hive,
+ loadPath,
+ tableName,
+ loadFileType.get,
+ isSrcLocal: JBoolean,
+ isSkewedStoreAsSubdir,
+ isAcidIUDoperation,
+ resetStatistics,
+ writeIdInLoadTableOrPartition,
+ stmtIdInLoadTableOrPartition: JInteger,
+ replace: JBoolean,
+ isDirectInsert: JBoolean)
+ }
+
+ override def alterPartitions(
+ hive: Hive,
+ tableName: String,
+ newParts: JList[Partition]): Unit = {
+ recordHiveCall()
+ val transactional: JBoolean = false
+ alterPartitionsMethod.invoke(
+ hive,
+ tableName,
+ newParts,
+ environmentContextInAlterTable,
+ transactional)
+ }
+
+ override def createPartitions(
+ hive: Hive,
+ table: Table,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = {
+ val partitions = parts.map(HiveClientImpl.toHivePartition(_, table).getTPartition).asJava
+ recordHiveCall()
+ val needResults = false
+ addPartitionsMethod.invoke(hive, partitions, ignoreIfExists, needResults: JBoolean)
+ }
+
+ override def loadPartition(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ recordHiveCall()
+ val table = hive.getTable(tableName)
+ val loadFileType = if (replace) {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
+ } else {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
+ }
+ assert(loadFileType.isDefined)
+ val inheritLocation: JBoolean = false
+ val isDirectInsert: JBoolean = false
+ recordHiveCall()
+ loadPartitionMethod.invoke(
+ hive,
+ loadPath,
+ table,
+ partSpec,
+ loadFileType.get,
+ inheritTableSpecs: JBoolean,
+ inheritLocation,
+ isSkewedStoreAsSubdir: JBoolean,
+ isSrcLocal: JBoolean,
+ isAcid,
+ hasFollowingStatsTask,
+ writeIdInLoadTableOrPartition,
+ stmtIdInLoadTableOrPartition,
+ replace: JBoolean,
+ isDirectInsert
+ )
+ }
+
+ override def loadDynamicPartitions(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ numDP: Int,
+ hiveTable: Table): Unit = {
+ import org.apache.hadoop.hive.ql.exec.Utilities
+ val loadFileType = if (replace) {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
+ } else {
+ clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
+ }
+ assert(loadFileType.isDefined)
+
+ val useAppendForLoad: JBoolean = false
+ val loadTableDesc = clazzLoadTableDesc
+ .getConstructor(
+ classOf[Path],
+ classOf[Table],
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ classOf[JMap[String, String]])
+ .newInstance(
+ loadPath,
+ hiveTable,
+ replace: JBoolean,
+ useAppendForLoad,
+ partSpec)
+ val ctx = new DynamicPartitionCtx()
+ ctx.setRootPath(loadPath)
+ ctx.setNumDPCols(numDP)
+ ctx.setPartSpec(partSpec)
+
+ val fullDPSpecs = classOf[Utilities].getMethod(
+ "getFullDPSpecs",
+ classOf[Configuration],
+ classOf[DynamicPartitionCtx],
+ classOf[java.util.Map[String, java.util.List[Path]]])
+ recordHiveCall()
+ val resetPartitionStats: JBoolean = false
+ loadDynamicPartitionsMethod.invoke(
+ hive,
+ loadTableDesc,
+ listBucketingLevel,
+ isAcid,
+ writeIdInLoadTableOrPartition,
+ stmtIdInLoadTableOrPartition,
+ resetPartitionStats,
+ AcidUtils.Operation.NOT_ACID,
+ fullDPSpecs.invoke(null, hive.getConf, ctx, null)
+ )
+ }
+
+ override def renamePartition(
+ hive: Hive,
+ table: Table,
+ oldPartSpec: JMap[String, String],
+ newPart: Partition): Unit = {
+ recordHiveCall()
+ renamePartitionMethod.invoke(hive, table, oldPartSpec, newPart, writeIdInLoadTableOrPartition)
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index b0570f5d30352..5d6f928d53dad 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -98,6 +98,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case (2, 3, _) => Some(hive.v2_3)
case (3, 0, _) => Some(hive.v3_0)
case (3, 1, _) => Some(hive.v3_1)
+ case (4, 0, _) => Some(hive.v4_0)
case _ => None
}.getOrElse {
throw QueryExecutionErrors.unsupportedHiveMetastoreVersionError(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index d172af21a9170..6a9815342e73a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -87,8 +87,22 @@ package object client {
"org.pentaho:pentaho-aggdesigner-algorithm",
"org.apache.hive:hive-vector-code-gen"))
+ case object v4_0 extends HiveVersion("4.0.1",
+ extraDeps = Seq("org.apache.hadoop:hadoop-hdfs:3.3.6",
+ "org.datanucleus:datanucleus-api-jdo:5.2.8",
+ "org.datanucleus:datanucleus-rdbms:5.2.10",
+ "org.datanucleus:javax.jdo:3.2.0-release",
+ "org.springframework:spring-core:5.3.21",
+ "org.springframework:spring-jdbc:5.3.21",
+ "org.antlr:antlr4-runtime:4.9.3",
+ "org.apache.derby:derby:10.14.2.0"),
+ exclusions = Seq("org.apache.calcite:calcite-druid",
+ "org.apache.curator:*",
+ "org.pentaho:pentaho-aggdesigner-algorithm",
+ "org.apache.hive:hive-vector-code-gen"))
+
val allSupportedHiveVersions: Set[HiveVersion] =
- Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
+ Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1, v4_0)
}
// scalastyle:on
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
index 07f212d2dcabb..5c65eb8b12bac 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -163,7 +163,7 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) {
// test alter database location
val tempDatabasePath2 = Utils.createTempDir().toURI
// Hive support altering database location since HIVE-8472.
- if (version == "3.0" || version == "3.1") {
+ if (version == "3.0" || version == "3.1" || version == "4.0") {
client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
val uriInCatalog = client.getDatabase("temporary").locationUri
assert("file" === uriInCatalog.getScheme)
@@ -376,7 +376,7 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) {
CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat)
}
client.createPartitions(
- "default", "src_part", partitions, ignoreIfExists = true)
+ client.getTable("default", "src_part"), partitions, ignoreIfExists = true)
}
test("getPartitionNames(catalogTable)") {
@@ -479,10 +479,12 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) {
val partitions = Seq(CatalogTablePartition(
Map("key1" -> "101", "key2" -> "102"),
storageFormat))
+ val table = client.getTable("default", "src_part")
+
try {
- client.createPartitions("default", "src_part", partitions, ignoreIfExists = false)
+ client.createPartitions(table, partitions, ignoreIfExists = false)
val e = intercept[PartitionsAlreadyExistException] {
- client.createPartitions("default", "src_part", partitions, ignoreIfExists = false)
+ client.createPartitions(table, partitions, ignoreIfExists = false)
}
checkError(e,
condition = "PARTITIONS_ALREADY_EXIST",
@@ -558,7 +560,7 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) {
test("sql create index and reset") {
// HIVE-18448 Since Hive 3.0, INDEX is not supported.
- if (version != "3.0" && version != "3.1") {
+ if (version != "3.0" && version != "3.1" && version != "4.0") {
client.runSqlHive("CREATE TABLE indexed_table (key INT)")
client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
"as 'COMPACT' WITH DEFERRED REBUILD")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
index 0bc6702079bdb..f54760e44b969 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
@@ -22,6 +22,6 @@ private[client] trait HiveClientVersions {
protected val versions = if (testVersions.nonEmpty) {
testVersions.get.split(",").map(_.trim).filter(_.nonEmpty).toIndexedSeq
} else {
- IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1")
+ IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1", "4.0")
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
index 1a4eb75547894..63be4dc363f1a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
@@ -111,8 +111,7 @@ class HivePartitionFilteringSuite(version: String)
), storageFormat)
assert(partitions.size == testPartitionCount)
- client.createPartitions(
- "default", "test", partitions, ignoreIfExists = false)
+ client.createPartitions(table, partitions, ignoreIfExists = false)
client
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
index 5db0b4f18c962..1a45f6b150969 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
@@ -31,14 +31,11 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
- if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
- version == "3.0" || version == "3.1") {
- hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
- hadoopConf.set("datanucleus.autoStartMechanismMode", "ignored")
- hadoopConf.set("hive.metastore.schema.verification", "false")
- }
+ hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
+ hadoopConf.set("datanucleus.autoStartMechanismMode", "ignored")
+ hadoopConf.set("hive.metastore.schema.verification", "false")
// Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
- if (version == "3.0" || version == "3.1") {
+ if (version == "3.0" || version == "3.1" || version == "4.0") {
hadoopConf.set("hive.in.test", "true")
hadoopConf.set("hive.query.reexecution.enabled", "false")
}