From 92eadb598a078c3d302b603def70afaf0b54709f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 21 Apr 2016 14:37:46 -0700 Subject: [PATCH 1/5] Squashed commit of the following: commit 0e16190717a0c2a417c54b870558fe9b65952ff0 Author: Andrew Or Date: Thu Apr 21 14:21:07 2016 -0700 Fix a few TODOs after rebase commit 2e1d1610873f4a7e1d2d94b21b9ff65ec60f3270 Merge: 989e770 a2e8d4f Author: Andrew Or Date: Thu Apr 21 14:19:08 2016 -0700 Merge branch 'master' of github.com:apache/spark into delete-hive-context commit 989e770185bab7c1c70738cd3170112c73bf9b1d Author: Andrew Or Date: Thu Apr 21 14:13:29 2016 -0700 Delete HiveContext class commit ece4ab7809f66f49c1c672b1eb3027ef269c7ba5 Author: Andrew Or Date: Thu Apr 21 14:11:04 2016 -0700 Delete all code usages of HiveContext --- .../examples/sql/hive/HiveFromSpark.scala | 7 ++-- .../org/apache/spark/sql/SparkSession.scala | 8 +++- .../hive/thriftserver/HiveThriftServer2.scala | 17 ++++---- .../SparkExecuteStatementOperation.scala | 22 +++++----- .../hive/thriftserver/SparkSQLCLIDriver.scala | 2 +- .../thriftserver/SparkSQLCLIService.scala | 8 ++-- .../hive/thriftserver/SparkSQLDriver.scala | 6 +-- .../sql/hive/thriftserver/SparkSQLEnv.scala | 22 +++++----- .../thriftserver/SparkSQLSessionManager.scala | 12 +++--- .../server/SparkSQLOperationManager.scala | 12 +++--- .../apache/spark/sql/hive/HiveContext.scala | 41 +------------------ .../spark/sql/hive/HiveSessionState.scala | 2 +- .../spark/sql/hive/HiveSharedState.scala | 3 +- .../hive/execution/CreateTableAsSelect.scala | 2 +- .../apache/spark/sql/hive/test/TestHive.scala | 2 +- .../spark/sql/hive/JavaDataFrameSuite.java | 2 +- .../hive/JavaMetastoreDataSourcesSuite.java | 7 ++-- .../regression-test-SPARK-8489/Main.scala | 9 ++-- 18 files changed, 80 insertions(+), 104 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index b654a2c8d4a40..ff33091621c14 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -24,7 +24,6 @@ import com.google.common.io.{ByteStreams, Files} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ -import org.apache.spark.sql.hive.HiveContext object HiveFromSpark { case class Record(key: Int, value: String) @@ -43,9 +42,9 @@ object HiveFromSpark { // using HiveQL. Users who do not have an existing Hive deployment can still create a // HiveContext. When not configured by the hive-site.xml, the context automatically // creates metastore_db and warehouse in the current directory. - val hiveContext = new HiveContext(sc) - import hiveContext.implicits._ - import hiveContext.sql + val sparkSession = SparkSession.withHiveSupport(sc) + import sparkSession.implicits._ + import sparkSession.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql(s"LOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 70d889b002e43..2fd7677f346ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -904,7 +904,7 @@ class SparkSession private( } -private object SparkSession { +object SparkSession { private def sharedStateClassName(conf: SparkConf): String = { conf.get(CATALOG_IMPLEMENTATION) match { @@ -937,4 +937,10 @@ private object SparkSession { } } + // TODO: do we want to expose this? + def withHiveSupport(sc: SparkContext): SparkSession = { + sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive") + new SparkSession(sc) + } + } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 6703cdbac3d17..24a25023a6e36 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -33,7 +33,8 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart} -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.hive.HiveSessionState import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab import org.apache.spark.sql.internal.SQLConf @@ -53,9 +54,9 @@ object HiveThriftServer2 extends Logging { * Starts a new thrift server with the given context. */ @DeveloperApi - def startWithContext(sqlContext: HiveContext): Unit = { + def startWithContext(sqlContext: SQLContext): Unit = { val server = new HiveThriftServer2(sqlContext) - server.init(sqlContext.sessionState.hiveconf) + server.init(sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf) server.start() listener = new HiveThriftServer2Listener(server, sqlContext.conf) sqlContext.sparkContext.addSparkListener(listener) @@ -82,11 +83,11 @@ object HiveThriftServer2 extends Logging { } try { - val server = new HiveThriftServer2(SparkSQLEnv.hiveContext) - server.init(SparkSQLEnv.hiveContext.sessionState.hiveconf) + val server = new HiveThriftServer2(SparkSQLEnv.sqlContext) + server.init(SparkSQLEnv.sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf) server.start() logInfo("HiveThriftServer2 started") - listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf) + listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf) SparkSQLEnv.sparkContext.addSparkListener(listener) uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) { Some(new ThriftServerTab(SparkSQLEnv.sparkContext)) @@ -261,7 +262,7 @@ object HiveThriftServer2 extends Logging { } } -private[hive] class HiveThriftServer2(hiveContext: HiveContext) +private[hive] class HiveThriftServer2(sqlContext: SQLContext) extends HiveServer2 with ReflectedCompositeService { // state is tracked internally so that the server only attempts to shut down if it successfully @@ -269,7 +270,7 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext) private val started = new AtomicBoolean(false) override def init(hiveConf: HiveConf) { - val sparkSqlCliService = new SparkSQLCLIService(this, hiveContext) + val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext) setSuperField(this, "cliService", sparkSqlCliService) addService(sparkSqlCliService) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index d89c3b4ab2d1c..91e20c00262e1 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -33,9 +33,9 @@ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Row => SparkRow} +import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext} import org.apache.spark.sql.execution.command.SetCommand -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} +import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveSessionState} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.{Utils => SparkUtils} @@ -45,7 +45,7 @@ private[hive] class SparkExecuteStatementOperation( statement: String, confOverlay: JMap[String, String], runInBackground: Boolean = true) - (hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String]) + (sqlContext: SQLContext, sessionToActivePool: SMap[SessionHandle, String]) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with Logging { @@ -68,7 +68,7 @@ private[hive] class SparkExecuteStatementOperation( def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. - hiveContext.sparkContext.clearJobGroup() + sqlContext.sparkContext.clearJobGroup() logDebug(s"CLOSING $statementId") cleanup(OperationState.CLOSED) } @@ -193,9 +193,9 @@ private[hive] class SparkExecuteStatementOperation( statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) + val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = - hiveContext.sessionState.executionHive.state.getConf.getClassLoader + val executionHiveClassLoader = sessionState.executionHive.state.getConf.getClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) HiveThriftServer2.listener.onStatementStart( @@ -204,12 +204,12 @@ private[hive] class SparkExecuteStatementOperation( statement, statementId, parentSession.getUsername) - hiveContext.sparkContext.setJobGroup(statementId, statement) + sqlContext.sparkContext.setJobGroup(statementId, statement) sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => - hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) + sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } try { - result = hiveContext.sql(statement) + result = sqlContext.sql(statement) logDebug(result.queryExecution.toString()) result.queryExecution.logical match { case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) => @@ -220,7 +220,7 @@ private[hive] class SparkExecuteStatementOperation( HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString()) iter = { val useIncrementalCollect = - hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean + sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean if (useIncrementalCollect) { result.toLocalIterator.asScala } else { @@ -253,7 +253,7 @@ private[hive] class SparkExecuteStatementOperation( override def cancel(): Unit = { logInfo(s"Cancel '$statement' with $statementId") if (statementId != null) { - hiveContext.sparkContext.cancelJobGroup(statementId) + sqlContext.sparkContext.cancelJobGroup(statementId) } cleanup(OperationState.CANCELED) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 57693284b01df..8acf85aac32fe 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -150,7 +150,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { } if (sessionState.database != null) { - SparkSQLEnv.hiveContext.sessionState.catalog.setCurrentDatabase( + SparkSQLEnv.sqlContext.sessionState.catalog.setCurrentDatabase( s"${sessionState.database}") } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 6fe57554cf580..1b17a9a56e5b9 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -33,17 +33,17 @@ import org.apache.hive.service.auth.HiveAuthFactory import org.apache.hive.service.cli._ import org.apache.hive.service.server.HiveServer2 -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: HiveContext) +private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLContext) extends CLIService(hiveServer) with ReflectedCompositeService { override def init(hiveConf: HiveConf) { setSuperField(this, "hiveConf", hiveConf) - val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, hiveContext) + val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, sqlContext) setSuperField(this, "sessionManager", sparkSqlSessionManager) addService(sparkSqlSessionManager) var sparkServiceUGI: UserGroupInformation = null @@ -66,7 +66,7 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: Hiv getInfoType match { case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL") case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Spark SQL") - case GetInfoType.CLI_DBMS_VER => new GetInfoValue(hiveContext.sparkContext.version) + case GetInfoType.CLI_DBMS_VER => new GetInfoValue(sqlContext.sparkContext.version) case _ => super.getInfo(sessionHandle, getInfoType) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 7e8eada5adb4f..87bebb2bddd8b 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -27,11 +27,11 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveQueryExecution} +import org.apache.spark.sql.{AnalysisException, SQLContext} +import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveQueryExecution} private[hive] class SparkSQLDriver( - val context: HiveContext = SparkSQLEnv.hiveContext) + val context: SQLContext = SparkSQLEnv.sqlContext) extends Driver with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 2679ac1854bb8..5ec69ac486f52 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -24,18 +24,19 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler.StatsReportListener -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.hive.{HiveContext, HiveSessionState} import org.apache.spark.util.Utils /** A singleton object for the master program. The slaves should not access this. */ private[hive] object SparkSQLEnv extends Logging { logDebug("Initializing SparkSQLEnv") - var hiveContext: HiveContext = _ + var sqlContext: SQLContext = _ var sparkContext: SparkContext = _ def init() { - if (hiveContext == null) { + if (sqlContext == null) { val sparkConf = new SparkConf(loadDefaults = true) val maybeSerializer = sparkConf.getOption("spark.serializer") val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking") @@ -56,16 +57,17 @@ private[hive] object SparkSQLEnv extends Logging { sparkContext = new SparkContext(sparkConf) sparkContext.addSparkListener(new StatsReportListener()) - hiveContext = new HiveContext(sparkContext) + sqlContext = SparkSession.withHiveSupport(sparkContext).wrapped + val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - hiveContext.sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) - hiveContext.sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) - hiveContext.sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) + sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) + sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) + sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) - hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) + sqlContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) if (log.isDebugEnabled) { - hiveContext.sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted + sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted .foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") } } } @@ -78,7 +80,7 @@ private[hive] object SparkSQLEnv extends Logging { if (SparkSQLEnv.sparkContext != null) { sparkContext.stop() sparkContext = null - hiveContext = null + sqlContext = null } } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index f492b5656c3c3..17ccfbb9c4946 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -27,12 +27,13 @@ import org.apache.hive.service.cli.session.SessionManager import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.hive.service.server.HiveServer2 -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.hive.{HiveContext, HiveSessionState} import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager -private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: HiveContext) +private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext) extends SessionManager(hiveServer) with ReflectedCompositeService { @@ -71,10 +72,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: val session = super.getSession(sessionHandle) HiveThriftServer2.listener.onSessionCreated( session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername) - val ctx = if (hiveContext.sessionState.hiveThriftServerSingleSession) { - hiveContext + val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] + val ctx = if (sessionState.hiveThriftServerSingleSession) { + sqlContext } else { - hiveContext.newSession() + sqlContext.newSession() } ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index da410c68c851d..79625239dea0e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -26,7 +26,8 @@ import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operati import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.hive.HiveSessionState import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation} /** @@ -39,17 +40,18 @@ private[thriftserver] class SparkSQLOperationManager() .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") val sessionToActivePool = Map[SessionHandle, String]() - val sessionToContexts = Map[SessionHandle, HiveContext]() + val sessionToContexts = Map[SessionHandle, SQLContext]() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { - val hiveContext = sessionToContexts(parentSession.getSessionHandle) - val runInBackground = async && hiveContext.sessionState.hiveThriftServerAsync + val sqlContext = sessionToContexts(parentSession.getSessionHandle) + val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] + val runInBackground = async && sessionState.hiveThriftServerAsync val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, - runInBackground)(hiveContext, sessionToActivePool) + runInBackground)(sqlContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index b2ce3e0df25b4..9119e8a1d8cd2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -35,7 +35,6 @@ import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION import org.apache.spark.sql._ @@ -45,46 +44,8 @@ import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -/** - * An instance of the Spark SQL execution engine that integrates with data stored in Hive. - * Configuration for Hive is read from hive-site.xml on the classpath. - * - * @since 1.0.0 - */ -class HiveContext private[hive]( - @transient private val sparkSession: SparkSession, - isRootContext: Boolean) - extends SQLContext(sparkSession, isRootContext) with Logging { - - self => - - def this(sc: SparkContext) = { - this(new SparkSession(HiveContext.withHiveExternalCatalog(sc)), true) - } - - def this(sc: JavaSparkContext) = this(sc.sc) - - /** - * Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF, - * temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader - * and Hive client (both of execution and metadata) with existing HiveContext. - */ - override def newSession(): HiveContext = { - new HiveContext(sparkSession.newSession(), isRootContext = false) - } - - protected[sql] override def sessionState: HiveSessionState = { - sparkSession.sessionState.asInstanceOf[HiveSessionState] - } - - protected[sql] override def sharedState: HiveSharedState = { - sparkSession.sharedState.asInstanceOf[HiveSharedState] - } - -} - -private[hive] object HiveContext extends Logging { +private[spark] object HiveContext extends Logging { def withHiveExternalCatalog(sc: SparkContext): SparkContext = { sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 6f4332c65f934..8094e5445147d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf} /** - * A class that holds all session-specific state in a given [[HiveContext]]. + * A class that holds all session-specific state in a given [[SparkSession]] backed by Hive. */ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala index 11097c33df2d5..82492d1cf664f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala @@ -23,7 +23,8 @@ import org.apache.spark.sql.internal.SharedState /** - * A class that holds all state shared across sessions in a given [[HiveContext]]. + * A class that holds all state shared across sessions in a given + * [[org.apache.spark.sql.SparkSession]] backed by Hive. */ private[hive] class HiveSharedState(override val sparkContext: SparkContext) extends SharedState(sparkContext) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index ceb7f3b890949..9c98587b6a61b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, MetastoreRelation} +import org.apache.spark.sql.hive.{HiveMetastoreTypes, MetastoreRelation} /** * Create table and insert the query result into it. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 2bb13996c145c..04dd643d4e5e0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -72,7 +72,7 @@ object TestHive * test cases that rely on TestHive must be serialized. */ class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootContext: Boolean) - extends HiveContext(sparkSession, isRootContext) { + extends SQLContext(sparkSession, isRootContext) { def this(sc: SparkContext) { this(new TestHiveSparkSession(HiveContext.withHiveExternalCatalog(sc)), true) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java index 397421ae92a47..64f2ded447a06 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java @@ -36,7 +36,7 @@ public class JavaDataFrameSuite { private transient JavaSparkContext sc; - private transient HiveContext hc; + private transient SQLContext hc; Dataset df; diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index 2fc38e2b2d2e7..f13c32db9d230 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -36,6 +36,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.QueryTest$; import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.hive.test.TestHive$; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; @@ -46,7 +47,7 @@ public class JavaMetastoreDataSourcesSuite { private transient JavaSparkContext sc; - private transient HiveContext sqlContext; + private transient SQLContext sqlContext; File path; Path hiveManagedPath; @@ -70,9 +71,9 @@ public void setUp() throws IOException { if (path.exists()) { path.delete(); } + HiveSessionCatalog catalog = (HiveSessionCatalog) sqlContext.sessionState().catalog(); hiveManagedPath = new Path( - sqlContext.sessionState().catalog().hiveDefaultTableFilePath( - new TableIdentifier("javaSavedTable"))); + catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable"))); fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); if (fs.exists(hiveManagedPath)){ fs.delete(hiveManagedPath, true); diff --git a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala index 2590040f2ec1c..10a017df831e0 100644 --- a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala +++ b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala @@ -15,8 +15,8 @@ * limitations under the License. */ -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.SparkContext +import org.apache.spark.sql.SparkSession /** * Entry point in test application for SPARK-8489. @@ -28,15 +28,16 @@ import org.apache.spark.sql.hive.HiveContext * * This is used in org.apache.spark.sql.hive.HiveSparkSubmitSuite. */ +// TODO: actually rebuild this jar with the new changes. object Main { def main(args: Array[String]) { // scalastyle:off println println("Running regression test for SPARK-8489.") val sc = new SparkContext("local", "testing") - val hc = new HiveContext(sc) + val sparkSession = SparkSession.withHiveSupport(sc) // This line should not throw scala.reflect.internal.MissingRequirementError. // See SPARK-8470 for more detail. - val df = hc.createDataFrame(Seq(MyCoolClass("1", "2", "3"))) + val df = sparkSession.createDataFrame(Seq(MyCoolClass("1", "2", "3"))) df.collect() println("Regression test for SPARK-8489 success!") // scalastyle:on println From 9eb2cc7cebb110034ac37114795f43358a6c7b3a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 21 Apr 2016 14:39:49 -0700 Subject: [PATCH 2/5] Rename HiveContext object to HiveUtils --- .../SparkExecuteStatementOperation.scala | 4 ++-- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 4 ++-- .../spark/sql/hive/thriftserver/SparkSQLEnv.scala | 4 ++-- .../hive/thriftserver/SparkSQLSessionManager.scala | 4 ++-- .../thriftserver/HiveThriftServer2Suites.scala | 6 +++--- .../hive/execution/HiveCompatibilitySuite.scala | 6 +++--- .../apache/spark/sql/hive/HiveQueryExecution.scala | 2 +- .../apache/spark/sql/hive/HiveSessionState.scala | 10 +++++----- .../apache/spark/sql/hive/HiveSharedState.scala | 4 ++-- .../hive/{HiveContext.scala => HiveUtils.scala} | 10 +++++----- .../sql/hive/client/IsolatedClientLoader.scala | 4 ++-- .../org/apache/spark/sql/hive/test/TestHive.scala | 8 ++++---- .../spark/sql/hive/HiveExternalCatalogSuite.scala | 2 +- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +- .../sql/hive/ParquetHiveCompatibilitySuite.scala | 2 +- .../spark/sql/hive/client/VersionsSuite.scala | 6 +++--- .../spark/sql/hive/execution/SQLQuerySuite.scala | 14 +++++++------- .../apache/spark/sql/hive/orc/OrcQuerySuite.scala | 4 ++-- .../org/apache/spark/sql/hive/parquetSuites.scala | 6 +++--- 19 files changed, 51 insertions(+), 51 deletions(-) rename sql/hive/src/main/scala/org/apache/spark/sql/hive/{HiveContext.scala => HiveUtils.scala} (98%) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 91e20c00262e1..37c14f8648b6d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -35,7 +35,7 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext} import org.apache.spark.sql.execution.command.SetCommand -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveSessionState} +import org.apache.spark.sql.hive.{HiveUtils, HiveMetastoreTypes, HiveSessionState} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.{Utils => SparkUtils} @@ -98,7 +98,7 @@ private[hive] class SparkExecuteStatementOperation( case TimestampType => to += from.getAs[Timestamp](ordinal) case BinaryType | _: ArrayType | _: StructType | _: MapType => - val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal))) + val hiveString = HiveUtils.toHiveString((from.get(ordinal), dataTypes(ordinal))) to += hiveString } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 8acf85aac32fe..1402e0a687290 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -39,7 +39,7 @@ import org.apache.thrift.transport.TSocket import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.util.ShutdownHookManager /** @@ -82,7 +82,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { val cliConf = new HiveConf(classOf[SessionState]) // Override the location of the metastore since this is only used for local execution. - HiveContext.newTemporaryConfiguration(useInMemoryDerby = false).foreach { + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach { case (key, value) => cliConf.set(key, value) } val sessionState = new CliSessionState(cliConf) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 5ec69ac486f52..a649aabaa24c3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -25,7 +25,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler.StatsReportListener import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.hive.{HiveContext, HiveSessionState} +import org.apache.spark.sql.hive.{HiveUtils, HiveSessionState} import org.apache.spark.util.Utils /** A singleton object for the master program. The slaves should not access this. */ @@ -64,7 +64,7 @@ private[hive] object SparkSQLEnv extends Logging { sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) - sqlContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) + sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) if (log.isDebugEnabled) { sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 17ccfbb9c4946..2697c94fd7222 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -28,7 +28,7 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.hive.service.server.HiveServer2 import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.{HiveContext, HiveSessionState} +import org.apache.spark.sql.hive.{HiveUtils, HiveSessionState} import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager @@ -78,7 +78,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: } else { sqlContext.newSession() } - ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) + ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx sessionHandle } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index ee14b6dc8d01d..bc45334036332 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -42,7 +42,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.internal.Logging -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.{ThreadUtils, Utils} @@ -115,7 +115,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val resultSet = statement.executeQuery("SET spark.sql.hive.version") resultSet.next() assert(resultSet.getString(1) === "spark.sql.hive.version") - assert(resultSet.getString(2) === HiveContext.hiveExecutionVersion) + assert(resultSet.getString(2) === HiveUtils.hiveExecutionVersion) } } @@ -624,7 +624,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { val resultSet = statement.executeQuery("SET spark.sql.hive.version") resultSet.next() assert(resultSet.getString(1) === "spark.sql.hive.version") - assert(resultSet.getString(2) === HiveContext.hiveExecutionVersion) + assert(resultSet.getString(2) === HiveUtils.hiveExecutionVersion) } } } diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 49fd19873017d..b7d6c26fbe154 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -23,7 +23,7 @@ import java.util.{Locale, TimeZone} import org.scalatest.BeforeAndAfter import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.internal.SQLConf @@ -60,7 +60,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.sessionState.functionRegistry.unregisterFunction("hash") // Ensures that the plans generation use metastore relation and not OrcRelation // Was done because SqlBuilder does not work with plans having logical relation - TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, false) + TestHive.setConf(HiveUtils.CONVERT_METASTORE_ORC, false) RuleExecutor.resetTime() } @@ -71,7 +71,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { Locale.setDefault(originalLocale) TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) - TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, originalConvertMetastoreOrc) + TestHive.setConf(HiveUtils.CONVERT_METASTORE_ORC, originalConvertMetastoreOrc) TestHive.sessionState.functionRegistry.restore() // For debugging dump some statistics about how much time was spent in various optimizer rules. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala index 1c1bfb610c29e..f1685d996ee2f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala @@ -53,7 +53,7 @@ protected[hive] class HiveQueryExecution(ctx: SQLContext, logicalPlan: LogicalPl // We need the types so we can output struct field names val types = analyzed.output.map(_.dataType) // Reformat to match hive tab delimited output. - result.map(_.zip(types).map(HiveContext.toHiveString)).map(_.mkString("\t")).toSeq + result.map(_.zip(types).map(HiveUtils.toHiveString)).map(_.mkString("\t")).toSeq } override def simpleString: String = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 8094e5445147d..d464b10eee69e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -208,7 +208,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) * SerDe. */ def convertMetastoreParquet: Boolean = { - conf.getConf(HiveContext.CONVERT_METASTORE_PARQUET) + conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) } /** @@ -218,7 +218,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) * This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true. */ def convertMetastoreParquetWithSchemaMerging: Boolean = { - conf.getConf(HiveContext.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING) + conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING) } /** @@ -227,7 +227,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) * SerDe. */ def convertMetastoreOrc: Boolean = { - conf.getConf(HiveContext.CONVERT_METASTORE_ORC) + conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) } /** @@ -243,14 +243,14 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) * and no SerDe is specified (no ROW FORMAT SERDE clause). */ def convertCTAS: Boolean = { - conf.getConf(HiveContext.CONVERT_CTAS) + conf.getConf(HiveUtils.CONVERT_CTAS) } /** * When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool." */ def hiveThriftServerAsync: Boolean = { - conf.getConf(HiveContext.HIVE_THRIFT_SERVER_ASYNC) + conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) } def hiveThriftServerSingleSession: Boolean = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala index 82492d1cf664f..fb1f59eed3407 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala @@ -35,7 +35,7 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext) * A Hive client used for execution. */ val executionHive: HiveClientImpl = { - HiveContext.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration) + HiveUtils.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration) } /** @@ -43,7 +43,7 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext) */ // This needs to be a lazy val at here because TestHiveSharedState is overriding it. lazy val metadataHive: HiveClient = { - HiveContext.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration) + HiveUtils.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala similarity index 98% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 9119e8a1d8cd2..a8561192ed849 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -private[spark] object HiveContext extends Logging { +private[spark] object HiveUtils extends Logging { def withHiveExternalCatalog(sc: SparkContext): SparkContext = { sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive") @@ -276,10 +276,10 @@ private[spark] object HiveContext extends Logging { configurations: Map[String, String]): HiveClient = { val sqlConf = new SQLConf sqlConf.setConf(SQLContext.getSQLProperties(conf)) - val hiveMetastoreVersion = HiveContext.hiveMetastoreVersion(sqlConf) - val hiveMetastoreJars = HiveContext.hiveMetastoreJars(sqlConf) - val hiveMetastoreSharedPrefixes = HiveContext.hiveMetastoreSharedPrefixes(sqlConf) - val hiveMetastoreBarrierPrefixes = HiveContext.hiveMetastoreBarrierPrefixes(sqlConf) + val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf) + val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf) + val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf) + val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 7e0d1b446ff5f..0380d2342be33 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -32,7 +32,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmitUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.util.{MutableURLClassLoader, Utils} /** Factory for `IsolatedClientLoader` with specific versions of hive. */ @@ -263,7 +263,7 @@ private[hive] class IsolatedClientLoader( throw new ClassNotFoundException( s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" + "Please make sure that jars for your version of hive and hadoop are included in the " + - s"paths passed to ${HiveContext.HIVE_METASTORE_JARS}.", e) + s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS}.", e) } else { throw e } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 04dd643d4e5e0..908da95830cf3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -75,7 +75,7 @@ class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootC extends SQLContext(sparkSession, isRootContext) { def this(sc: SparkContext) { - this(new TestHiveSparkSession(HiveContext.withHiveExternalCatalog(sc)), true) + this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc)), true) } override def newSession(): TestHiveContext = { @@ -118,7 +118,7 @@ private[hive] class TestHiveSparkSession( sc, Utils.createTempDir(namePrefix = "warehouse"), TestHiveContext.makeScratchDir(), - HiveContext.newTemporaryConfiguration(useInMemoryDerby = false), + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false), None) } @@ -577,7 +577,7 @@ private[hive] object TestHiveContext { scratchDirPath: File, metastoreTemporaryConf: Map[String, String]): HiveClient = { val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf]) - HiveContext.newClientForMetadata( + HiveUtils.newClientForMetadata( conf, hiveConf, hadoopConf, @@ -592,7 +592,7 @@ private[hive] object TestHiveContext { warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String]): Map[String, String] = { - HiveContext.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ Map( + HiveUtils.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ Map( ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString, ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true", ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 84285b7f40832..cb60a2c8cff97 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -31,7 +31,7 @@ class HiveExternalCatalogSuite extends CatalogTestCases { private val client: HiveClient = { // We create a metastore at a temp location to avoid any potential // conflict of having multiple connections to a single derby instance. - HiveContext.newClientForExecution(new SparkConf, new Configuration) + HiveUtils.newClientForExecution(new SparkConf, new Configuration) } protected override val utils: CatalogTestUtils = new CatalogTestUtils { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index bbe135b2d6980..dc87daae7248e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -550,7 +550,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } test("scan a parquet table created through a CTAS statement") { - withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "true") { + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") { withTempTable("jt") { (1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index a9823ae26278d..d78914505a03d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -53,7 +53,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil // Don't convert Hive metastore Parquet tables to let Hive write those Parquet files. - withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") { + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") { withTempTable("data") { val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s" col_$index $typ" } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 8b0719209dedf..e0288ff98f42c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression} import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.types.IntegerType import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.Utils @@ -62,7 +62,7 @@ class VersionsSuite extends SparkFunSuite with Logging { test("success sanity check") { val badClient = IsolatedClientLoader.forVersion( - hiveMetastoreVersion = HiveContext.hiveExecutionVersion, + hiveMetastoreVersion = HiveUtils.hiveExecutionVersion, hadoopVersion = VersionInfo.getVersion, sparkConf = sparkConf, hadoopConf = new Configuration(), @@ -76,7 +76,7 @@ class VersionsSuite extends SparkFunSuite with Logging { val hadoopConf = new Configuration(); hadoopConf.set("test", "success") val client = IsolatedClientLoader.forVersion( - hiveMetastoreVersion = HiveContext.hiveExecutionVersion, + hiveMetastoreVersion = HiveUtils.hiveExecutionVersion, hadoopVersion = VersionInfo.getVersion, sparkConf = sparkConf, hadoopConf = hadoopConf, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 345ee8ef28eae..2e14aaa6d7118 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} +import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -351,7 +351,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { val originalConf = sessionState.convertCTAS - setConf(HiveContext.CONVERT_CTAS, true) + setConf(HiveUtils.CONVERT_CTAS, true) try { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value") @@ -395,7 +395,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { checkRelation("ctas1", false) sql("DROP TABLE ctas1") } finally { - setConf(HiveContext.CONVERT_CTAS, originalConf) + setConf(HiveUtils.CONVERT_CTAS, originalConf) sql("DROP TABLE IF EXISTS ctas1") } } @@ -470,7 +470,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { | FROM src | ORDER BY key, value""".stripMargin).collect() - withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") { + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") { checkExistence(sql("DESC EXTENDED ctas5"), true, "name:key", "type:string", "name:value", "ctas5", "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", @@ -481,7 +481,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } // use the Hive SerDe for parquet tables - withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") { + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") { checkAnswer( sql("SELECT key, value FROM ctas5 ORDER BY key, value"), sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq) @@ -732,7 +732,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""")) read.json(rdd).registerTempTable("data") val originalConf = sessionState.convertCTAS - setConf(HiveContext.CONVERT_CTAS, false) + setConf(HiveUtils.CONVERT_CTAS, false) try { sql("CREATE TABLE explodeTest (key bigInt)") @@ -751,7 +751,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("DROP TABLE explodeTest") dropTempTable("data") } finally { - setConf(HiveContext.CONVERT_CTAS, originalConf) + setConf(HiveUtils.CONVERT_CTAS, originalConf) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 5ef8194f28881..4fb78ac02cb55 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.internal.SQLConf @@ -406,7 +406,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { test("SPARK-14070 Use ORC data source for SQL queries on ORC tables") { withTempPath { dir => withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true", - HiveContext.CONVERT_METASTORE_ORC.key -> "true") { + HiveUtils.CONVERT_METASTORE_ORC.key -> "true") { val path = dir.getCanonicalPath withTable("dummy_orc") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 6fa4c3334fa72..2984ee99bee77 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -174,7 +174,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { (1 to 10).map(i => (i, s"str$i")).toDF("a", "b").registerTempTable("jt") (1 to 10).map(i => Tuple1(Seq(new Integer(i), null))).toDF("a").registerTempTable("jt_array") - setConf(HiveContext.CONVERT_METASTORE_PARQUET, true) + setConf(HiveUtils.CONVERT_METASTORE_PARQUET, true) } override def afterAll(): Unit = { @@ -186,7 +186,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { "jt", "jt_array", "test_parquet") - setConf(HiveContext.CONVERT_METASTORE_PARQUET, false) + setConf(HiveUtils.CONVERT_METASTORE_PARQUET, false) } test(s"conversion is working") { @@ -619,7 +619,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { withTable("array_of_struct") { val conf = Seq( - HiveContext.CONVERT_METASTORE_PARQUET.key -> "false", + HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false", SQLConf.PARQUET_BINARY_AS_STRING.key -> "true", SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") From a8811fced3c3c47c1b12136a9bbd2aabfce786d4 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 21 Apr 2016 14:43:11 -0700 Subject: [PATCH 3/5] Revert "Squashed commit of the following:" This reverts commit 92eadb598a078c3d302b603def70afaf0b54709f. --- .../examples/sql/hive/HiveFromSpark.scala | 7 ++-- .../org/apache/spark/sql/SparkSession.scala | 8 +--- .../hive/thriftserver/HiveThriftServer2.scala | 17 ++++---- .../SparkExecuteStatementOperation.scala | 22 +++++------ .../hive/thriftserver/SparkSQLCLIDriver.scala | 2 +- .../thriftserver/SparkSQLCLIService.scala | 8 ++-- .../hive/thriftserver/SparkSQLDriver.scala | 6 +-- .../sql/hive/thriftserver/SparkSQLEnv.scala | 21 +++++----- .../thriftserver/SparkSQLSessionManager.scala | 11 +++--- .../server/SparkSQLOperationManager.scala | 12 +++--- .../spark/sql/hive/HiveSessionState.scala | 2 +- .../spark/sql/hive/HiveSharedState.scala | 3 +- .../org/apache/spark/sql/hive/HiveUtils.scala | 39 +++++++++++++++++++ .../hive/execution/CreateTableAsSelect.scala | 2 +- .../apache/spark/sql/hive/test/TestHive.scala | 2 +- .../spark/sql/hive/JavaDataFrameSuite.java | 2 +- .../hive/JavaMetastoreDataSourcesSuite.java | 7 ++-- .../regression-test-SPARK-8489/Main.scala | 9 ++--- 18 files changed, 103 insertions(+), 77 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index ff33091621c14..b654a2c8d4a40 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -24,6 +24,7 @@ import com.google.common.io.{ByteStreams, Files} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ +import org.apache.spark.sql.hive.HiveContext object HiveFromSpark { case class Record(key: Int, value: String) @@ -42,9 +43,9 @@ object HiveFromSpark { // using HiveQL. Users who do not have an existing Hive deployment can still create a // HiveContext. When not configured by the hive-site.xml, the context automatically // creates metastore_db and warehouse in the current directory. - val sparkSession = SparkSession.withHiveSupport(sc) - import sparkSession.implicits._ - import sparkSession.sql + val hiveContext = new HiveContext(sc) + import hiveContext.implicits._ + import hiveContext.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql(s"LOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 2fd7677f346ab..70d889b002e43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -904,7 +904,7 @@ class SparkSession private( } -object SparkSession { +private object SparkSession { private def sharedStateClassName(conf: SparkConf): String = { conf.get(CATALOG_IMPLEMENTATION) match { @@ -937,10 +937,4 @@ object SparkSession { } } - // TODO: do we want to expose this? - def withHiveSupport(sc: SparkContext): SparkSession = { - sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive") - new SparkSession(sc) - } - } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 24a25023a6e36..6703cdbac3d17 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -33,8 +33,7 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart} -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.HiveSessionState +import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab import org.apache.spark.sql.internal.SQLConf @@ -54,9 +53,9 @@ object HiveThriftServer2 extends Logging { * Starts a new thrift server with the given context. */ @DeveloperApi - def startWithContext(sqlContext: SQLContext): Unit = { + def startWithContext(sqlContext: HiveContext): Unit = { val server = new HiveThriftServer2(sqlContext) - server.init(sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf) + server.init(sqlContext.sessionState.hiveconf) server.start() listener = new HiveThriftServer2Listener(server, sqlContext.conf) sqlContext.sparkContext.addSparkListener(listener) @@ -83,11 +82,11 @@ object HiveThriftServer2 extends Logging { } try { - val server = new HiveThriftServer2(SparkSQLEnv.sqlContext) - server.init(SparkSQLEnv.sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf) + val server = new HiveThriftServer2(SparkSQLEnv.hiveContext) + server.init(SparkSQLEnv.hiveContext.sessionState.hiveconf) server.start() logInfo("HiveThriftServer2 started") - listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf) + listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf) SparkSQLEnv.sparkContext.addSparkListener(listener) uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) { Some(new ThriftServerTab(SparkSQLEnv.sparkContext)) @@ -262,7 +261,7 @@ object HiveThriftServer2 extends Logging { } } -private[hive] class HiveThriftServer2(sqlContext: SQLContext) +private[hive] class HiveThriftServer2(hiveContext: HiveContext) extends HiveServer2 with ReflectedCompositeService { // state is tracked internally so that the server only attempts to shut down if it successfully @@ -270,7 +269,7 @@ private[hive] class HiveThriftServer2(sqlContext: SQLContext) private val started = new AtomicBoolean(false) override def init(hiveConf: HiveConf) { - val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext) + val sparkSqlCliService = new SparkSQLCLIService(this, hiveContext) setSuperField(this, "cliService", sparkSqlCliService) addService(sparkSqlCliService) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 37c14f8648b6d..332f46e6f7269 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -33,9 +33,9 @@ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext} +import org.apache.spark.sql.{DataFrame, Row => SparkRow} import org.apache.spark.sql.execution.command.SetCommand -import org.apache.spark.sql.hive.{HiveUtils, HiveMetastoreTypes, HiveSessionState} +import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveSessionState, HiveUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.{Utils => SparkUtils} @@ -45,7 +45,7 @@ private[hive] class SparkExecuteStatementOperation( statement: String, confOverlay: JMap[String, String], runInBackground: Boolean = true) - (sqlContext: SQLContext, sessionToActivePool: SMap[SessionHandle, String]) + (hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String]) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with Logging { @@ -68,7 +68,7 @@ private[hive] class SparkExecuteStatementOperation( def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. - sqlContext.sparkContext.clearJobGroup() + hiveContext.sparkContext.clearJobGroup() logDebug(s"CLOSING $statementId") cleanup(OperationState.CLOSED) } @@ -193,9 +193,9 @@ private[hive] class SparkExecuteStatementOperation( statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sessionState.executionHive.state.getConf.getClassLoader + val executionHiveClassLoader = + hiveContext.sessionState.executionHive.state.getConf.getClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) HiveThriftServer2.listener.onStatementStart( @@ -204,12 +204,12 @@ private[hive] class SparkExecuteStatementOperation( statement, statementId, parentSession.getUsername) - sqlContext.sparkContext.setJobGroup(statementId, statement) + hiveContext.sparkContext.setJobGroup(statementId, statement) sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => - sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) + hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } try { - result = sqlContext.sql(statement) + result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) result.queryExecution.logical match { case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) => @@ -220,7 +220,7 @@ private[hive] class SparkExecuteStatementOperation( HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString()) iter = { val useIncrementalCollect = - sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean + hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean if (useIncrementalCollect) { result.toLocalIterator.asScala } else { @@ -253,7 +253,7 @@ private[hive] class SparkExecuteStatementOperation( override def cancel(): Unit = { logInfo(s"Cancel '$statement' with $statementId") if (statementId != null) { - sqlContext.sparkContext.cancelJobGroup(statementId) + hiveContext.sparkContext.cancelJobGroup(statementId) } cleanup(OperationState.CANCELED) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 1402e0a687290..057fbbe6d93ce 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -150,7 +150,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { } if (sessionState.database != null) { - SparkSQLEnv.sqlContext.sessionState.catalog.setCurrentDatabase( + SparkSQLEnv.hiveContext.sessionState.catalog.setCurrentDatabase( s"${sessionState.database}") } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 1b17a9a56e5b9..6fe57554cf580 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -33,17 +33,17 @@ import org.apache.hive.service.auth.HiveAuthFactory import org.apache.hive.service.cli._ import org.apache.hive.service.server.HiveServer2 -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLContext) +private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: HiveContext) extends CLIService(hiveServer) with ReflectedCompositeService { override def init(hiveConf: HiveConf) { setSuperField(this, "hiveConf", hiveConf) - val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, sqlContext) + val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, hiveContext) setSuperField(this, "sessionManager", sparkSqlSessionManager) addService(sparkSqlSessionManager) var sparkServiceUGI: UserGroupInformation = null @@ -66,7 +66,7 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC getInfoType match { case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL") case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Spark SQL") - case GetInfoType.CLI_DBMS_VER => new GetInfoValue(sqlContext.sparkContext.version) + case GetInfoType.CLI_DBMS_VER => new GetInfoValue(hiveContext.sparkContext.version) case _ => super.getInfo(sessionHandle, getInfoType) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 87bebb2bddd8b..7e8eada5adb4f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -27,11 +27,11 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, SQLContext} -import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveQueryExecution} +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveQueryExecution} private[hive] class SparkSQLDriver( - val context: SQLContext = SparkSQLEnv.sqlContext) + val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index a649aabaa24c3..7b43dec698d69 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -25,18 +25,18 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler.StatsReportListener import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.hive.{HiveUtils, HiveSessionState} +import org.apache.spark.sql.hive.{HiveContext, HiveUtils} import org.apache.spark.util.Utils /** A singleton object for the master program. The slaves should not access this. */ private[hive] object SparkSQLEnv extends Logging { logDebug("Initializing SparkSQLEnv") - var sqlContext: SQLContext = _ + var hiveContext: HiveContext = _ var sparkContext: SparkContext = _ def init() { - if (sqlContext == null) { + if (hiveContext == null) { val sparkConf = new SparkConf(loadDefaults = true) val maybeSerializer = sparkConf.getOption("spark.serializer") val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking") @@ -57,17 +57,16 @@ private[hive] object SparkSQLEnv extends Logging { sparkContext = new SparkContext(sparkConf) sparkContext.addSparkListener(new StatsReportListener()) - sqlContext = SparkSession.withHiveSupport(sparkContext).wrapped - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] + hiveContext = new HiveContext(sparkContext) - sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) - sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) - sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) + hiveContext.sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) + hiveContext.sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) + hiveContext.sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) - sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) + hiveContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) if (log.isDebugEnabled) { - sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted + hiveContext.sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted .foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") } } } @@ -80,7 +79,7 @@ private[hive] object SparkSQLEnv extends Logging { if (SparkSQLEnv.sparkContext != null) { sparkContext.stop() sparkContext = null - sqlContext = null + hiveContext = null } } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 2697c94fd7222..98e5da44a162e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -28,12 +28,12 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.hive.service.server.HiveServer2 import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.{HiveUtils, HiveSessionState} +import org.apache.spark.sql.hive.{HiveContext, HiveUtils} import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager -private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext) +private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: HiveContext) extends SessionManager(hiveServer) with ReflectedCompositeService { @@ -72,11 +72,10 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: val session = super.getSession(sessionHandle) HiveThriftServer2.listener.onSessionCreated( session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername) - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - val ctx = if (sessionState.hiveThriftServerSingleSession) { - sqlContext + val ctx = if (hiveContext.sessionState.hiveThriftServerSingleSession) { + hiveContext } else { - sqlContext.newSession() + hiveContext.newSession() } ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 79625239dea0e..da410c68c851d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -26,8 +26,7 @@ import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operati import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.HiveSessionState +import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation} /** @@ -40,18 +39,17 @@ private[thriftserver] class SparkSQLOperationManager() .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") val sessionToActivePool = Map[SessionHandle, String]() - val sessionToContexts = Map[SessionHandle, SQLContext]() + val sessionToContexts = Map[SessionHandle, HiveContext]() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { - val sqlContext = sessionToContexts(parentSession.getSessionHandle) - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - val runInBackground = async && sessionState.hiveThriftServerAsync + val hiveContext = sessionToContexts(parentSession.getSessionHandle) + val runInBackground = async && hiveContext.sessionState.hiveThriftServerAsync val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, - runInBackground)(sqlContext, sessionToActivePool) + runInBackground)(hiveContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index d464b10eee69e..7bb334dff6a74 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf} /** - * A class that holds all session-specific state in a given [[SparkSession]] backed by Hive. + * A class that holds all session-specific state in a given [[HiveContext]]. */ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala index fb1f59eed3407..1d8ce3099d2a9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala @@ -23,8 +23,7 @@ import org.apache.spark.sql.internal.SharedState /** - * A class that holds all state shared across sessions in a given - * [[org.apache.spark.sql.SparkSession]] backed by Hive. + * A class that holds all state shared across sessions in a given [[HiveContext]]. */ private[hive] class HiveSharedState(override val sparkContext: SparkContext) extends SharedState(sparkContext) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index a8561192ed849..44d3cc257b065 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION import org.apache.spark.sql._ @@ -44,6 +45,44 @@ import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * An instance of the Spark SQL execution engine that integrates with data stored in Hive. + * Configuration for Hive is read from hive-site.xml on the classpath. + * + * @since 1.0.0 + */ +class HiveContext private[hive]( + @transient private val sparkSession: SparkSession, + isRootContext: Boolean) + extends SQLContext(sparkSession, isRootContext) with Logging { + + self => + + def this(sc: SparkContext) = { + this(new SparkSession(HiveUtils.withHiveExternalCatalog(sc)), true) + } + + def this(sc: JavaSparkContext) = this(sc.sc) + + /** + * Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF, + * temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader + * and Hive client (both of execution and metadata) with existing HiveContext. + */ + override def newSession(): HiveContext = { + new HiveContext(sparkSession.newSession(), isRootContext = false) + } + + protected[sql] override def sessionState: HiveSessionState = { + sparkSession.sessionState.asInstanceOf[HiveSessionState] + } + + protected[sql] override def sharedState: HiveSharedState = { + sparkSession.sharedState.asInstanceOf[HiveSharedState] + } + +} + private[spark] object HiveUtils extends Logging { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 9c98587b6a61b..ceb7f3b890949 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.hive.{HiveMetastoreTypes, MetastoreRelation} +import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, MetastoreRelation} /** * Create table and insert the query result into it. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 908da95830cf3..7f820c8f15ded 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -72,7 +72,7 @@ object TestHive * test cases that rely on TestHive must be serialized. */ class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootContext: Boolean) - extends SQLContext(sparkSession, isRootContext) { + extends HiveContext(sparkSession, isRootContext) { def this(sc: SparkContext) { this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc)), true) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java index 64f2ded447a06..397421ae92a47 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java @@ -36,7 +36,7 @@ public class JavaDataFrameSuite { private transient JavaSparkContext sc; - private transient SQLContext hc; + private transient HiveContext hc; Dataset df; diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index f13c32db9d230..2fc38e2b2d2e7 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -36,7 +36,6 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.QueryTest$; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.hive.test.TestHive$; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; @@ -47,7 +46,7 @@ public class JavaMetastoreDataSourcesSuite { private transient JavaSparkContext sc; - private transient SQLContext sqlContext; + private transient HiveContext sqlContext; File path; Path hiveManagedPath; @@ -71,9 +70,9 @@ public void setUp() throws IOException { if (path.exists()) { path.delete(); } - HiveSessionCatalog catalog = (HiveSessionCatalog) sqlContext.sessionState().catalog(); hiveManagedPath = new Path( - catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable"))); + sqlContext.sessionState().catalog().hiveDefaultTableFilePath( + new TableIdentifier("javaSavedTable"))); fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); if (fs.exists(hiveManagedPath)){ fs.delete(hiveManagedPath, true); diff --git a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala index 10a017df831e0..2590040f2ec1c 100644 --- a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala +++ b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala @@ -15,8 +15,8 @@ * limitations under the License. */ -import org.apache.spark.SparkContext -import org.apache.spark.sql.SparkSession +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.hive.HiveContext /** * Entry point in test application for SPARK-8489. @@ -28,16 +28,15 @@ import org.apache.spark.sql.SparkSession * * This is used in org.apache.spark.sql.hive.HiveSparkSubmitSuite. */ -// TODO: actually rebuild this jar with the new changes. object Main { def main(args: Array[String]) { // scalastyle:off println println("Running regression test for SPARK-8489.") val sc = new SparkContext("local", "testing") - val sparkSession = SparkSession.withHiveSupport(sc) + val hc = new HiveContext(sc) // This line should not throw scala.reflect.internal.MissingRequirementError. // See SPARK-8470 for more detail. - val df = sparkSession.createDataFrame(Seq(MyCoolClass("1", "2", "3"))) + val df = hc.createDataFrame(Seq(MyCoolClass("1", "2", "3"))) df.collect() println("Regression test for SPARK-8489 success!") // scalastyle:on println From ac0d7daacba650752c4913421f23a5fc7f7ddf0d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 21 Apr 2016 14:47:17 -0700 Subject: [PATCH 4/5] Remove some unused imports --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 2 +- .../org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala | 1 - .../spark/sql/hive/thriftserver/SparkSQLSessionManager.scala | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 332f46e6f7269..3025660301663 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -35,7 +35,7 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row => SparkRow} import org.apache.spark.sql.execution.command.SetCommand -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveSessionState, HiveUtils} +import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.{Utils => SparkUtils} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 7b43dec698d69..465457f1babfe 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler.StatsReportListener -import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.hive.{HiveContext, HiveUtils} import org.apache.spark.util.Utils diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 98e5da44a162e..a0beffdaa25e5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -27,7 +27,6 @@ import org.apache.hive.service.cli.session.SessionManager import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.hive.service.server.HiveServer2 -import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.{HiveContext, HiveUtils} import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager From d4f11815e5ae02409eaec973b8883fee48503955 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 21 Apr 2016 16:47:54 -0700 Subject: [PATCH 5/5] Fix compile --- .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 6a20d7c25b682..e95069e830dee 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -23,8 +23,7 @@ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.ql.exec.Utilities -import org.apache.hadoop.hive.ql.metadata.{HiveUtils, Partition => HivePartition, - Table => HiveTable} +import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde2.Deserializer import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, @@ -300,7 +299,8 @@ private[hive] object HiveTableUtil { def configureJobPropertiesForStorageHandler( tableDesc: TableDesc, jobConf: JobConf, input: Boolean) { val property = tableDesc.getProperties.getProperty(META_TABLE_STORAGE) - val storageHandler = HiveUtils.getStorageHandler(jobConf, property) + val storageHandler = + org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property) if (storageHandler != null) { val jobProperties = new util.LinkedHashMap[String, String] if (input) {