Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45265][SQL] Support Hive 4.0 metastore #48823

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion docs/sql-data-sources-hive-tables.md
Original file line number Diff line number Diff line change
@@ -130,7 +130,7 @@ The following options can be used to configure the version of Hive that is used
<td><code>2.3.10</code></td>
<td>
Version of the Hive metastore. Available
options are <code>2.0.0</code> through <code>2.3.10</code> and <code>3.0.0</code> through <code>3.1.3</code>.
options are <code>2.0.0</code> through <code>2.3.10</code>, <code>3.0.0</code> through <code>3.1.3</code>, and <code>4.0.0</code> through <code>4.0.1</code>.
</td>
<td>1.4.0</td>
</tr>
8 changes: 8 additions & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
@@ -1183,6 +1183,14 @@ object Hive {
// Hive tests need higher metaspace size
(Test / javaOptions) := (Test / javaOptions).value.filterNot(_.contains("MaxMetaspaceSize")),
(Test / javaOptions) += "-XX:MaxMetaspaceSize=2g",
// SPARK-45265: HivePartitionFilteringSuite addPartitions related tests generate supper long
// direct sql against derby server, which may cause stack overflow error when derby do sql
// parsing.
// We need to increase the Xss for the test. Meanwhile, QueryParsingErrorsSuite requires a
// smaller size of Xss to mock a FAILED_TO_PARSE_TOO_COMPLEX error, so we need to set for
// hive moudle specifically.
(Test / javaOptions) := (Test / javaOptions).value.filterNot(_.contains("Xss")),
(Test / javaOptions) += "-Xss64m",
// Supporting all SerDes requires us to depend on deprecated APIs, so we turn off the warnings
// only for this subproject.
scalacOptions := (scalacOptions map { currentOpts: Seq[String] =>
Original file line number Diff line number Diff line change
@@ -458,27 +458,28 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
if (sessionState.getIsVerbose) {
out.println(cmd)
}
val rc = driver.run(cmd)
val endTimeNs = System.nanoTime()
val timeTaken: Double = TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs) / 1000.0

ret = rc.getResponseCode
if (ret != 0) {
val format = SparkSQLEnv.sparkSession.sessionState.conf.errorMessageFormat
val e = rc.getException
val msg = e match {
case st: SparkThrowable with Throwable => SparkThrowableHelper.getMessage(st, format)
case _ => e.getMessage
}
err.println(msg)
if (format == ErrorMessageFormat.PRETTY &&
try {
driver.run(cmd)
} catch {
case t: Throwable =>
ret = 1
val format = SparkSQLEnv.sparkSession.sessionState.conf.errorMessageFormat
val msg = t match {
case st: SparkThrowable with Throwable =>
SparkThrowableHelper.getMessage(st, format)
case _ => t.getMessage
}
err.println(msg)
if (format == ErrorMessageFormat.PRETTY &&
!sessionState.getIsSilent &&
(!e.isInstanceOf[AnalysisException] || e.getCause != null)) {
e.printStackTrace(err)
}
driver.close()
return ret
(!t.isInstanceOf[AnalysisException] || t.getCause != null)) {
t.printStackTrace(err)
}
driver.close()
return ret
}
val endTimeNs = System.nanoTime()
val timeTaken: Double = TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs) / 1000.0

val res = new JArrayList[String]()

Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.COMMAND
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.CommandResult
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution}
import org.apache.spark.sql.execution.HiveResult.hiveResultString
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}

@@ -82,10 +82,10 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv.
} catch {
case st: SparkThrowable =>
logDebug(s"Failed in [$command]", st)
new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(st), st.getSqlState, st)
throw st
case cause: Throwable =>
logError(log"Failed in [${MDC(COMMAND, command)}]", cause)
new CommandProcessorResponse(1, ExceptionUtils.getStackTrace(cause), null, cause)
throw new QueryExecutionException(ExceptionUtils.getStackTrace(cause))
}
}

Original file line number Diff line number Diff line change
@@ -1030,7 +1030,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
val metaStoreParts = partsWithLocation
.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))
client.createPartitions(db, table, metaStoreParts, ignoreIfExists)
client.createPartitions(tableMeta, metaStoreParts, ignoreIfExists)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

}

override def dropPartitions(
Original file line number Diff line number Diff line change
@@ -74,8 +74,9 @@ private[spark] object HiveUtils extends Logging {

val HIVE_METASTORE_VERSION = buildStaticConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
"<code>2.0.0</code> through <code>2.3.10</code> and " +
"<code>3.0.0</code> through <code>3.1.3</code>.")
"<code>2.0.0</code> through <code>2.3.10</code>, " +
"<code>3.0.0</code> through <code>3.1.3</code> and " +
"<code>4.0.0</code> through <code>4.0.1</code>.")
.version("1.4.0")
.stringConf
.checkValue(isCompatibleHiveVersion, "Unsupported Hive Metastore version")
Original file line number Diff line number Diff line change
@@ -164,8 +164,7 @@ private[hive] trait HiveClient {
* Create one or many partitions in the given table.
*/
def createPartitions(
db: String,
table: String,
table: CatalogTable,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit

Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@

package org.apache.spark.sql.hive.client

import java.io.PrintStream
import java.io.{OutputStream, PrintStream}
import java.lang.{Iterable => JIterable}
import java.lang.reflect.InvocationTargetException
import java.nio.charset.StandardCharsets.UTF_8
@@ -28,6 +28,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
@@ -44,7 +45,7 @@ import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.{SparkConf, SparkException, SparkThrowable}
import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
@@ -121,6 +122,7 @@ private[hive] class HiveClientImpl(
case hive.v2_3 => new Shim_v2_3()
case hive.v3_0 => new Shim_v3_0()
case hive.v3_1 => new Shim_v3_1()
case hive.v4_0 => new Shim_v4_0()
}

// Create an internal session state for this HiveClientImpl.
@@ -177,8 +179,10 @@ private[hive] class HiveClientImpl(
// got changed. We reset it to clientLoader.ClassLoader here.
state.getConf.setClassLoader(clientLoader.classLoader)
shim.setCurrentSessionState(state)
state.out = new PrintStream(outputBuffer, true, UTF_8.name())
state.err = new PrintStream(outputBuffer, true, UTF_8.name())
val clz = state.getClass.getField("out").getType.asInstanceOf[Class[_ <: PrintStream]]
val ctor = clz.getConstructor(classOf[OutputStream], classOf[Boolean], classOf[String])
state.getClass.getField("out").set(state, ctor.newInstance(outputBuffer, true, UTF_8.name()))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build fail : ”the result type of an implicit conversion must be more specific than Object“

state.getClass.getField("err").set(state, ctor.newInstance(outputBuffer, true, UTF_8.name()))
state
}

@@ -307,15 +311,27 @@ private[hive] class HiveClientImpl(
}

def setOut(stream: PrintStream): Unit = withHiveState {
state.out = stream
val ctor = state.getClass.getField("out")
.getType
.asInstanceOf[Class[_ <: PrintStream]]
.getConstructor(classOf[OutputStream])
state.getClass.getField("out").set(state, ctor.newInstance(stream))
}

def setInfo(stream: PrintStream): Unit = withHiveState {
state.info = stream
val ctor = state.getClass.getField("info")
.getType
.asInstanceOf[Class[_ <: PrintStream]]
.getConstructor(classOf[OutputStream])
state.getClass.getField("info").set(state, ctor.newInstance(stream))
}

def setError(stream: PrintStream): Unit = withHiveState {
state.err = stream
val ctor = state.getClass.getField("err")
.getType
.asInstanceOf[Class[_ <: PrintStream]]
.getConstructor(classOf[OutputStream])
state.getClass.getField("err").set(state, ctor.newInstance(stream))
}

private def setCurrentDatabaseRaw(db: String): Unit = {
@@ -629,21 +645,22 @@ private[hive] class HiveClientImpl(
}

override def createPartitions(
db: String,
table: String,
table: CatalogTable,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = withHiveState {
def replaceExistException(e: Throwable): Unit = e match {
case _: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] =>
val hiveTable = client.getTable(db, table)
val db = table.identifier.database.getOrElse(state.getCurrentDatabase)
val tableName = table.identifier.table
val hiveTable = client.getTable(db, tableName)
val existingParts = parts.filter { p =>
shim.getPartitions(client, hiveTable, p.spec.asJava).nonEmpty
}
throw new PartitionsAlreadyExistException(db, table, existingParts.map(_.spec))
throw new PartitionsAlreadyExistException(db, tableName, existingParts.map(_.spec))
case _ => throw e
}
try {
shim.createPartitions(client, db, table, parts, ignoreIfExists)
shim.createPartitions(client, toHiveTable(table), parts, ignoreIfExists)
} catch {
case e: InvocationTargetException => replaceExistException(e.getCause)
case e: Throwable => replaceExistException(e)
@@ -861,11 +878,22 @@ private[hive] class HiveClientImpl(
// Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed
// and the CommandProcessorFactory.clean function removed.
driver.getClass.getMethod("close").invoke(driver)
if (version != hive.v3_0 && version != hive.v3_1) {
if (version != hive.v3_0 && version != hive.v3_1 && version != hive.v4_0) {
CommandProcessorFactory.clean(conf)
}
}

def getResponseCode(response: CommandProcessorResponse): Int = {
if (version < hive.v4_0) {
response.getResponseCode
} else {
// Since Hive 4.0, response code is removed from CommandProcessorResponse.
// Here we simply return 0 for the positive cases as for error cases it will
// throw exceptions early.
0
}
}

// Hive query needs to start SessionState.
SessionState.start(state)
logDebug(s"Running hiveql '$cmd'")
@@ -878,30 +906,44 @@ private[hive] class HiveClientImpl(
val proc = shim.getCommandProcessor(tokens(0), conf)
proc match {
case driver: Driver =>
val response: CommandProcessorResponse = driver.run(cmd)
// Throw an exception if there is an error in query processing.
if (response.getResponseCode != 0) {
try {
val response: CommandProcessorResponse = driver.run(cmd)
if (getResponseCode(response) != 0) {
// Throw an exception if there is an error in query processing.
// This works for hive 3.x and earlier versions.
throw new QueryExecutionException(response.getErrorMessage)
}
driver.setMaxRows(maxRows)
val results = shim.getDriverResults(driver)
results
} catch {
case e @ (_: QueryExecutionException | _: SparkThrowable) =>
throw e
case e: Exception =>
// Wrap the original hive error with QueryExecutionException and throw it
// if there is an error in query processing.
// This works for hive 4.x and later versions.
throw new QueryExecutionException(ExceptionUtils.getStackTrace(e))
} finally {
closeDriver(driver)
throw new QueryExecutionException(response.getErrorMessage)
}
driver.setMaxRows(maxRows)

val results = shim.getDriverResults(driver)
closeDriver(driver)
results

case _ =>
if (state.out != null) {
val out = state.getClass.getField("out").get(state)
if (out != null) {
// scalastyle:off println
state.out.println(tokens(0) + " " + cmd_1)
out.asInstanceOf[PrintStream].println(tokens(0) + " " + cmd_1)
// scalastyle:on println
}
val response: CommandProcessorResponse = proc.run(cmd_1)
// Throw an exception if there is an error in query processing.
if (response.getResponseCode != 0) {
val responseCode = getResponseCode(response)
if (responseCode != 0) {
// Throw an exception if there is an error in query processing.
// This works for hive 3.x and earlier versions. For 4.x and later versions,
// It will go to the catch block directly.
throw new QueryExecutionException(response.getErrorMessage)
}
Seq(response.getResponseCode.toString)
Seq(responseCode.toString)
}
} catch {
case e: Exception =>
@@ -971,7 +1013,7 @@ private[hive] class HiveClientImpl(
partSpec,
replace,
numDP,
listBucketingEnabled = hiveTable.isStoredAsSubDirectories)
hiveTable)
}

override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState {
292 changes: 277 additions & 15 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
Original file line number Diff line number Diff line change
@@ -25,14 +25,15 @@ import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{IMetaStoreClient, PartitionDropOptions, TableType}
import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, Function => HiveFunction, FunctionType, Index, MetaException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.io.AcidUtils
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.plan.{AddPartitionDesc, DynamicPartitionCtx}
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
@@ -42,15 +43,16 @@ import org.apache.spark.internal.LogKeys.{CONFIG, CONFIG2, CONFIG3}
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable, CatalogTablePartition, CatalogUtils, ExternalCatalogUtils, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateFormatter, TypeUtils}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{AtomicType, DateType, IntegralType, IntegralTypeExpression, StringType}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

/**
* A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used
@@ -161,8 +163,7 @@ private[client] sealed abstract class Shim {

def createPartitions(
hive: Hive,
dbName: String,
tableName: String,
table: Table,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit

@@ -196,7 +197,7 @@ private[client] sealed abstract class Shim {
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit
hiveTable: Table): Unit

def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit

@@ -258,6 +259,8 @@ private[client] class Shim_v2_0 extends Shim with Logging {
// txnId can be 0 unless isAcid == true
protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L

protected lazy val wildcard: String = ".*"

override def getMSC(hive: Hive): IMetaStoreClient = hive.getMSC

private lazy val loadPartitionMethod =
@@ -318,11 +321,10 @@ private[client] class Shim_v2_0 extends Shim with Logging {

override def createPartitions(
hive: Hive,
dbName: String,
tableName: String,
table: Table,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val addPartitionDesc = new AddPartitionDesc(dbName, tableName, ignoreIfExists)
val addPartitionDesc = new AddPartitionDesc(table.getDbName, table.getTableName, ignoreIfExists)
parts.zipWithIndex.foreach { case (s, i) =>
addPartitionDesc.addPartition(
s.spec.asJava, s.storage.locationUri.map(CatalogUtils.URIToString).orNull)
@@ -504,8 +506,9 @@ private[client] class Shim_v2_0 extends Shim with Logging {
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
hiveTable: Table): Unit = {
recordHiveCall()
val listBucketingEnabled = hiveTable.isStoredAsSubDirectories
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions)
}
@@ -819,13 +822,13 @@ private[client] class Shim_v2_0 extends Shim with Logging {
Some(s"$value ${op.symbol} $name")

case Contains(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
Some(s"$name like " + (("\".*" + value.drop(1)).dropRight(1) + ".*\""))
Some(s"$name like " + (("\"" + wildcard + value.drop(1)).dropRight(1) + wildcard + "\""))

case StartsWith(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
Some(s"$name like " + (value.dropRight(1) + ".*\""))
Some(s"$name like " + (value.dropRight(1) + wildcard + "\""))

case EndsWith(ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) =>
Some(s"$name like " + ("\".*" + value.drop(1)))
Some(s"$name like " + ("\"" + wildcard + value.drop(1)))

case And(expr1, expr2) if useAdvanced =>
val converted = convert(expr1) ++ convert(expr2)
@@ -1096,8 +1099,9 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
hiveTable: Table): Unit = {
recordHiveCall()
val listBucketingEnabled = hiveTable.isStoredAsSubDirectories
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions,
hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID)
@@ -1262,7 +1266,7 @@ private[client] class Shim_v3_0 extends Shim_v2_3 {
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
hiveTable: Table): Unit = {
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
@@ -1278,3 +1282,261 @@ private[client] class Shim_v3_0 extends Shim_v2_3 {
}

private[client] class Shim_v3_1 extends Shim_v3_0

private[client] class Shim_v4_0 extends Shim_v3_1 {
private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass(
"org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType")
private lazy val clazzLoadTableDesc = getClass.getClassLoader.loadClass(
"org.apache.hadoop.hive.ql.plan.LoadTableDesc")
private lazy val clazzPartitionDetails =
Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities$PartitionDesc")

override protected lazy val wildcard: String = "%"

private lazy val alterTableMethod =
findMethod(
classOf[Hive],
"alterTable",
classOf[String],
classOf[Table],
classOf[EnvironmentContext],
JBoolean.TYPE)
private lazy val loadTableMethod =
findMethod(
classOf[Hive],
"loadTable",
classOf[Path],
classOf[String],
clazzLoadFileType,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
classOf[JLong],
JInteger.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val addPartitionsMethod =
findMethod(
classOf[Hive],
"addPartitions",
classOf[JList[Partition]],
JBoolean.TYPE,
JBoolean.TYPE) // needResults
private lazy val alterPartitionsMethod =
findMethod(
classOf[Hive],
"alterPartitions",
classOf[String],
classOf[JList[Partition]],
classOf[EnvironmentContext],
JBoolean.TYPE)
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
"loadPartition",
classOf[Path],
classOf[Table],
classOf[JMap[String, String]],
clazzLoadFileType,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
classOf[JLong],
JInteger.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadDynamicPartitionsMethod =
findMethod(
classOf[Hive],
"loadDynamicPartitions",
clazzLoadTableDesc,
JInteger.TYPE, // numLB number of buckets
JBoolean.TYPE, // isAcid true if this is an ACID operation
JLong.TYPE, // writeId, can be 0 unless isAcid == true
JInteger.TYPE, // stmtId
JBoolean.TYPE, // resetStatistics
classOf[AcidUtils.Operation],
classOf[JMap[Path, clazzPartitionDetails.type]])
private lazy val renamePartitionMethod =
findMethod(
classOf[Hive],
"renamePartition",
classOf[Table],
classOf[JMap[String, String]],
classOf[Partition],
JLong.TYPE)

override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
recordHiveCall()
val transactional = false
alterTableMethod.invoke(
hive,
tableName,
table,
environmentContextInAlterTable,
transactional: JBoolean
)
}

override def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
}
assert(loadFileType.isDefined)
recordHiveCall()
val resetStatistics = false
val isDirectInsert = false
loadTableMethod.invoke(
hive,
loadPath,
tableName,
loadFileType.get,
isSrcLocal: JBoolean,
isSkewedStoreAsSubdir,
isAcidIUDoperation,
resetStatistics,
writeIdInLoadTableOrPartition,
stmtIdInLoadTableOrPartition: JInteger,
replace: JBoolean,
isDirectInsert: JBoolean)
}

override def alterPartitions(
hive: Hive,
tableName: String,
newParts: JList[Partition]): Unit = {
recordHiveCall()
val transactional: JBoolean = false
alterPartitionsMethod.invoke(
hive,
tableName,
newParts,
environmentContextInAlterTable,
transactional)
}

override def createPartitions(
hive: Hive,
table: Table,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val partitions = parts.map(HiveClientImpl.toHivePartition(_, table).getTPartition).asJava
recordHiveCall()
val needResults = false
addPartitionsMethod.invoke(hive, partitions, ignoreIfExists, needResults: JBoolean)
}

override def loadPartition(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
recordHiveCall()
val table = hive.getTable(tableName)
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
}
assert(loadFileType.isDefined)
val inheritLocation: JBoolean = false
val isDirectInsert: JBoolean = false
recordHiveCall()
loadPartitionMethod.invoke(
hive,
loadPath,
table,
partSpec,
loadFileType.get,
inheritTableSpecs: JBoolean,
inheritLocation,
isSkewedStoreAsSubdir: JBoolean,
isSrcLocal: JBoolean,
isAcid,
hasFollowingStatsTask,
writeIdInLoadTableOrPartition,
stmtIdInLoadTableOrPartition,
replace: JBoolean,
isDirectInsert
)
}

override def loadDynamicPartitions(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
hiveTable: Table): Unit = {
import org.apache.hadoop.hive.ql.exec.Utilities
val loadFileType = if (replace) {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL"))
} else {
clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING"))
}
assert(loadFileType.isDefined)

val useAppendForLoad: JBoolean = false
val loadTableDesc = clazzLoadTableDesc
.getConstructor(
classOf[Path],
classOf[Table],
JBoolean.TYPE,
JBoolean.TYPE,
classOf[JMap[String, String]])
.newInstance(
loadPath,
hiveTable,
replace: JBoolean,
useAppendForLoad,
partSpec)
val ctx = new DynamicPartitionCtx()
ctx.setRootPath(loadPath)
ctx.setNumDPCols(numDP)
ctx.setPartSpec(partSpec)

val fullDPSpecs = classOf[Utilities].getMethod(
"getFullDPSpecs",
classOf[Configuration],
classOf[DynamicPartitionCtx],
classOf[java.util.Map[String, java.util.List[Path]]])
recordHiveCall()
val resetPartitionStats: JBoolean = false
loadDynamicPartitionsMethod.invoke(
hive,
loadTableDesc,
listBucketingLevel,
isAcid,
writeIdInLoadTableOrPartition,
stmtIdInLoadTableOrPartition,
resetPartitionStats,
AcidUtils.Operation.NOT_ACID,
fullDPSpecs.invoke(null, hive.getConf, ctx, null)
)
}

override def renamePartition(
hive: Hive,
table: Table,
oldPartSpec: JMap[String, String],
newPart: Partition): Unit = {
recordHiveCall()
renamePartitionMethod.invoke(hive, table, oldPartSpec, newPart, writeIdInLoadTableOrPartition)
}
}
Original file line number Diff line number Diff line change
@@ -98,6 +98,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case (2, 3, _) => Some(hive.v2_3)
case (3, 0, _) => Some(hive.v3_0)
case (3, 1, _) => Some(hive.v3_1)
case (4, 0, _) => Some(hive.v4_0)
case _ => None
}.getOrElse {
throw QueryExecutionErrors.unsupportedHiveMetastoreVersionError(
Original file line number Diff line number Diff line change
@@ -87,8 +87,22 @@ package object client {
"org.pentaho:pentaho-aggdesigner-algorithm",
"org.apache.hive:hive-vector-code-gen"))

case object v4_0 extends HiveVersion("4.0.1",
extraDeps = Seq("org.apache.hadoop:hadoop-hdfs:3.3.6",
"org.datanucleus:datanucleus-api-jdo:5.2.8",
"org.datanucleus:datanucleus-rdbms:5.2.10",
"org.datanucleus:javax.jdo:3.2.0-release",
"org.springframework:spring-core:5.3.21",
"org.springframework:spring-jdbc:5.3.21",
"org.antlr:antlr4-runtime:4.9.3",
"org.apache.derby:derby:10.14.2.0"),
exclusions = Seq("org.apache.calcite:calcite-druid",
"org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm",
"org.apache.hive:hive-vector-code-gen"))

val allSupportedHiveVersions: Set[HiveVersion] =
Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1, v4_0)
}
// scalastyle:on

Original file line number Diff line number Diff line change
@@ -163,7 +163,7 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) {
// test alter database location
val tempDatabasePath2 = Utils.createTempDir().toURI
// Hive support altering database location since HIVE-8472.
if (version == "3.0" || version == "3.1") {
if (version == "3.0" || version == "3.1" || version == "4.0") {
client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
val uriInCatalog = client.getDatabase("temporary").locationUri
assert("file" === uriInCatalog.getScheme)
@@ -376,7 +376,7 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) {
CatalogTablePartition(Map("key1" -> "1", "key2" -> key2.toString), storageFormat)
}
client.createPartitions(
"default", "src_part", partitions, ignoreIfExists = true)
client.getTable("default", "src_part"), partitions, ignoreIfExists = true)
}

test("getPartitionNames(catalogTable)") {
@@ -479,10 +479,12 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) {
val partitions = Seq(CatalogTablePartition(
Map("key1" -> "101", "key2" -> "102"),
storageFormat))
val table = client.getTable("default", "src_part")

try {
client.createPartitions("default", "src_part", partitions, ignoreIfExists = false)
client.createPartitions(table, partitions, ignoreIfExists = false)
val e = intercept[PartitionsAlreadyExistException] {
client.createPartitions("default", "src_part", partitions, ignoreIfExists = false)
client.createPartitions(table, partitions, ignoreIfExists = false)
}
checkError(e,
condition = "PARTITIONS_ALREADY_EXIST",
@@ -558,7 +560,7 @@ class HiveClientSuite(version: String) extends HiveVersionSuite(version) {

test("sql create index and reset") {
// HIVE-18448 Since Hive 3.0, INDEX is not supported.
if (version != "3.0" && version != "3.1") {
if (version != "3.0" && version != "3.1" && version != "4.0") {
client.runSqlHive("CREATE TABLE indexed_table (key INT)")
client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
"as 'COMPACT' WITH DEFERRED REBUILD")
Original file line number Diff line number Diff line change
@@ -22,6 +22,6 @@ private[client] trait HiveClientVersions {
protected val versions = if (testVersions.nonEmpty) {
testVersions.get.split(",").map(_.trim).filter(_.nonEmpty).toIndexedSeq
} else {
IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1")
IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1", "4.0")
}
}
Original file line number Diff line number Diff line change
@@ -111,8 +111,7 @@ class HivePartitionFilteringSuite(version: String)
), storageFormat)
assert(partitions.size == testPartitionCount)

client.createPartitions(
"default", "test", partitions, ignoreIfExists = false)
client.createPartitions(table, partitions, ignoreIfExists = false)
client
}

Original file line number Diff line number Diff line change
@@ -31,14 +31,11 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" ||
version == "3.0" || version == "3.1") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("datanucleus.autoStartMechanismMode", "ignored")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("datanucleus.autoStartMechanismMode", "ignored")
hadoopConf.set("hive.metastore.schema.verification", "false")
// Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`.
if (version == "3.0" || version == "3.1") {
if (version == "3.0" || version == "3.1" || version == "4.0") {
hadoopConf.set("hive.in.test", "true")
hadoopConf.set("hive.query.reexecution.enabled", "false")
}