Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.{BufferedReader, InputStreamReader, File, PrintStream}
import java.net.URI
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConversions._
import scala.language.reflectiveCalls
Expand Down Expand Up @@ -136,12 +137,62 @@ private[hive] class ClientWrapper(

// TODO: should be a def?s
// When we create this val client, the HiveConf of it (conf) is the one associated with state.
private val client = Hive.get(conf)
@GuardedBy("this")
private var client = Hive.get(conf)

// We use hive's conf for compatibility.
private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES)
private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf)

/**
* Runs `f` with multiple retries in case the hive metastore is temporarily unreachable.
*/
private def retryLocked[A](f: => A): A = synchronized {
// Hive sometimes retries internally, so set a deadline to avoid compounding delays.
val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong
var numTries = 0
var caughtException: Exception = null
do {
numTries += 1
try {
return f
} catch {
case e: Exception if causedByThrift(e) =>
caughtException = e
logWarning(
"HiveClientWrapper got thrift exception, destroying client and retrying " +
s"(${retryLimit - numTries} tries remaining)", e)
Thread.sleep(retryDelayMillis)
try {
client = Hive.get(state.getConf, true)
} catch {
case e: Exception if causedByThrift(e) =>
logWarning("Failed to refresh hive client, will retry.", e)
}
}
} while (numTries <= retryLimit && System.nanoTime < deadline)
if (System.nanoTime > deadline) {
logWarning("Deadline exceeded")
}
throw caughtException
}

private def causedByThrift(e: Throwable): Boolean = {
var target = e
while (target != null) {
val msg = target.getMessage()
if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
return true
}
target = target.getCause()
}
false
}

/**
* Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
*/
private def withHiveState[A](f: => A): A = synchronized {
private def withHiveState[A](f: => A): A = retryLocked {
val original = Thread.currentThread().getContextClassLoader
// Set the thread local metastore client to the client associated with this ClientWrapper.
Hive.set(client)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.lang.{Boolean => JBoolean, Integer => JInteger}
import java.lang.reflect.{Method, Modifier}
import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -64,6 +65,8 @@ private[client] sealed abstract class Shim {

def getDriverResults(driver: Driver): Seq[String]

def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long

def loadPartition(
hive: Hive,
loadPath: Path,
Expand Down Expand Up @@ -192,6 +195,10 @@ private[client] class Shim_v0_12 extends Shim {
res.toSeq
}

override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000
}

override def loadPartition(
hive: Hive,
loadPath: Path,
Expand Down Expand Up @@ -321,6 +328,12 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val getTimeVarMethod =
findMethod(
classOf[HiveConf],
"getTimeVar",
classOf[HiveConf.ConfVars],
classOf[TimeUnit])

override def loadPartition(
hive: Hive,
Expand Down Expand Up @@ -359,4 +372,10 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE)
}

override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
getTimeVarMethod.invoke(
conf,
HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
TimeUnit.MILLISECONDS).asInstanceOf[Long]
}
}