From 292cbce301baca6ae1a2db1892325b05cbfa32c2 Mon Sep 17 00:00:00 2001 From: Feng Liu Date: Sun, 25 Feb 2018 23:08:47 -0800 Subject: [PATCH 1/4] lazy --- R/pkg/tests/fulltests/test_sparkSQL.R | 8 +++++++- .../sql/catalyst/catalog/SessionCatalog.scala | 11 +++++++---- .../sql/internal/BaseSessionStateBuilder.scala | 4 ++-- .../HiveMetastoreLazyInitializationSuite.scala | 18 +++++++++++++++--- .../spark/sql/hive/HiveSessionCatalog.scala | 8 ++++---- .../sql/hive/HiveSessionStateBuilder.scala | 10 +++++----- 6 files changed, 40 insertions(+), 19 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 5197838eaac6..084596ee1900 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -33,6 +33,9 @@ markUtf8 <- function(s) { } setHiveContext <- function(sc) { + previousSession <- get(".sparkRsession", envir = .sparkREnv) + # In case the spark conf is changed during the test, let's also make a copy of the confs. + previousConf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getSessionConf", previousSession) if (exists(".testHiveSession", envir = .sparkREnv)) { hiveSession <- get(".testHiveSession", envir = .sparkREnv) } else { @@ -46,16 +49,19 @@ setHiveContext <- function(sc) { }) hiveSession <- callJMethod(hiveCtx, "sparkSession") } - previousSession <- get(".sparkRsession", envir = .sparkREnv) assign(".sparkRsession", hiveSession, envir = .sparkREnv) assign(".prevSparkRsession", previousSession, envir = .sparkREnv) + assign(".prevSessionConf", previousConf, envir = .sparkREnv) hiveSession } unsetHiveContext <- function() { previousSession <- get(".prevSparkRsession", envir = .sparkREnv) + previousConf <- get(".prevSessionConf", envir = .sparkREnv) + callJStatic("org.apache.spark.sql.api.r.SQLUtils", "setSparkContextSessionConf", previousSessioni, previousConf) assign(".sparkRsession", previousSession, envir = .sparkREnv) remove(".prevSparkRsession", envir = .sparkREnv) + remove(".prevSparkConf", envir = .sparkREnv) } # Tests for SparkSQL functions in SparkR diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 4b119c75260a..64e7ca11270b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -54,8 +54,8 @@ object SessionCatalog { * This class must be thread-safe. */ class SessionCatalog( - val externalCatalog: ExternalCatalog, - globalTempViewManager: GlobalTempViewManager, + externalCatalogBuilder: () => ExternalCatalog, + globalTempViewManagerBuilder: () => GlobalTempViewManager, functionRegistry: FunctionRegistry, conf: SQLConf, hadoopConf: Configuration, @@ -70,8 +70,8 @@ class SessionCatalog( functionRegistry: FunctionRegistry, conf: SQLConf) { this( - externalCatalog, - new GlobalTempViewManager("global_temp"), + () => externalCatalog, + () => new GlobalTempViewManager("global_temp"), functionRegistry, conf, new Configuration(), @@ -87,6 +87,9 @@ class SessionCatalog( new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) } + lazy val externalCatalog = externalCatalogBuilder() + lazy val globalTempViewManager = globalTempViewManagerBuilder() + /** List of temporary views, mapping from table name to their logical plan. */ @GuardedBy("this") protected val tempViews = new mutable.HashMap[String, LogicalPlan] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 007f8760edf8..3a0db7e16c23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -130,8 +130,8 @@ abstract class BaseSessionStateBuilder( */ protected lazy val catalog: SessionCatalog = { val catalog = new SessionCatalog( - session.sharedState.externalCatalog, - session.sharedState.globalTempViewManager, + () => session.sharedState.externalCatalog, + () => session.sharedState.globalTempViewManager, functionRegistry, conf, SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala index 3f135cc86498..8f04e9013b64 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala @@ -19,17 +19,19 @@ package org.apache.spark.sql.hive import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.test.SQLTestUtilsBase import org.apache.spark.util.Utils -class HiveMetastoreLazyInitializationSuite extends SparkFunSuite { +class HiveMetastoreLazyInitializationSuite extends SparkFunSuite with SQLTestUtilsBase { - test("lazily initialize Hive client") { - val spark = SparkSession.builder() + override def spark: SparkSession = SparkSession.builder() .appName("HiveMetastoreLazyInitializationSuite") .master("local[2]") .enableHiveSupport() .config("spark.hadoop.hive.metastore.uris", "thrift://127.0.0.1:11111") .getOrCreate() + + test("lazily initialize Hive client") { val originalLevel = org.apache.log4j.Logger.getRootLogger().getLevel try { // Avoid outputting a lot of expected warning logs @@ -38,6 +40,16 @@ class HiveMetastoreLazyInitializationSuite extends SparkFunSuite { // We should be able to run Spark jobs without Hive client. assert(spark.sparkContext.range(0, 1).count() === 1) + // We should be able to use Spark SQL if no table references. + assert(spark.sql("select 1 + 1").count() === 1) + assert(spark.range(0, 1).count() === 1) + + // We should be able to use fs + withTempPath { dir => + spark.range(0, 1).write.parquet(dir.getAbsolutePath) + assert(spark.read.parquet(dir.getAbsolutePath).count() === 1) + } + // Make sure that we are not using the local derby metastore. val exceptionString = Utils.exceptionString(intercept[AnalysisException] { spark.sql("show tables") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 1f11adbd4f62..e5aff3b99d0b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -39,8 +39,8 @@ import org.apache.spark.sql.types.{DecimalType, DoubleType} private[sql] class HiveSessionCatalog( - externalCatalog: HiveExternalCatalog, - globalTempViewManager: GlobalTempViewManager, + externalCatalogBuilder: () => HiveExternalCatalog, + globalTempViewManagerBuilder: () => GlobalTempViewManager, val metastoreCatalog: HiveMetastoreCatalog, functionRegistry: FunctionRegistry, conf: SQLConf, @@ -48,8 +48,8 @@ private[sql] class HiveSessionCatalog( parser: ParserInterface, functionResourceLoader: FunctionResourceLoader) extends SessionCatalog( - externalCatalog, - globalTempViewManager, + externalCatalogBuilder, + globalTempViewManagerBuilder, functionRegistry, conf, hadoopConf, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 12c74368dd18..40b9bb51ca9a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -42,8 +42,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session * Create a Hive aware resource loader. */ override protected lazy val resourceLoader: HiveSessionResourceLoader = { - val client: HiveClient = externalCatalog.client - new HiveSessionResourceLoader(session, client) + new HiveSessionResourceLoader(session, () => externalCatalog.client) } /** @@ -51,8 +50,8 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session */ override protected lazy val catalog: HiveSessionCatalog = { val catalog = new HiveSessionCatalog( - externalCatalog, - session.sharedState.globalTempViewManager, + () => externalCatalog, + () => session.sharedState.globalTempViewManager, new HiveMetastoreCatalog(session), functionRegistry, conf, @@ -105,8 +104,9 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session class HiveSessionResourceLoader( session: SparkSession, - client: HiveClient) + clientBuilder: () => HiveClient) extends SessionResourceLoader(session) { + private lazy val client = clientBuilder() override def addJar(path: String): Unit = { client.addJar(path) super.addJar(path) From 57f2a3dd435eeb09a0c3c3735482de53d3a7e7d8 Mon Sep 17 00:00:00 2001 From: Feng Liu Date: Mon, 26 Feb 2018 17:19:53 -0800 Subject: [PATCH 2/4] comments --- R/pkg/tests/fulltests/test_sparkSQL.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 084596ee1900..c11b7704da97 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -58,10 +58,10 @@ setHiveContext <- function(sc) { unsetHiveContext <- function() { previousSession <- get(".prevSparkRsession", envir = .sparkREnv) previousConf <- get(".prevSessionConf", envir = .sparkREnv) - callJStatic("org.apache.spark.sql.api.r.SQLUtils", "setSparkContextSessionConf", previousSessioni, previousConf) + callJStatic("org.apache.spark.sql.api.r.SQLUtils", "setSparkContextSessionConf", previousSession, previousConf) assign(".sparkRsession", previousSession, envir = .sparkREnv) remove(".prevSparkRsession", envir = .sparkREnv) - remove(".prevSparkConf", envir = .sparkREnv) + remove(".prevSessionConf", envir = .sparkREnv) } # Tests for SparkSQL functions in SparkR From 6a962e900a2b9de2e434f2a6ec1eb256ea87a774 Mon Sep 17 00:00:00 2001 From: Feng Liu Date: Mon, 26 Feb 2018 23:05:14 -0800 Subject: [PATCH 3/4] fix test --- R/pkg/tests/fulltests/test_sparkSQL.R | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index c11b7704da97..bd0a0dcd0674 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -33,9 +33,6 @@ markUtf8 <- function(s) { } setHiveContext <- function(sc) { - previousSession <- get(".sparkRsession", envir = .sparkREnv) - # In case the spark conf is changed during the test, let's also make a copy of the confs. - previousConf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getSessionConf", previousSession) if (exists(".testHiveSession", envir = .sparkREnv)) { hiveSession <- get(".testHiveSession", envir = .sparkREnv) } else { @@ -49,19 +46,16 @@ setHiveContext <- function(sc) { }) hiveSession <- callJMethod(hiveCtx, "sparkSession") } + previousSession <- get(".sparkRsession", envir = .sparkREnv) assign(".sparkRsession", hiveSession, envir = .sparkREnv) assign(".prevSparkRsession", previousSession, envir = .sparkREnv) - assign(".prevSessionConf", previousConf, envir = .sparkREnv) hiveSession } unsetHiveContext <- function() { previousSession <- get(".prevSparkRsession", envir = .sparkREnv) - previousConf <- get(".prevSessionConf", envir = .sparkREnv) - callJStatic("org.apache.spark.sql.api.r.SQLUtils", "setSparkContextSessionConf", previousSession, previousConf) assign(".sparkRsession", previousSession, envir = .sparkREnv) remove(".prevSparkRsession", envir = .sparkREnv) - remove(".prevSessionConf", envir = .sparkREnv) } # Tests for SparkSQL functions in SparkR @@ -73,6 +67,8 @@ sparkSession <- if (windows_with_hadoop()) { sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) } sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) +# materialize the catalog implementation +listTables() mockLines <- c("{\"name\":\"Michael\"}", "{\"name\":\"Andy\", \"age\":30}", From d0eacc2048cf07193aca20f8011b677099884278 Mon Sep 17 00:00:00 2001 From: Feng Liu Date: Thu, 1 Mar 2018 14:17:57 -0800 Subject: [PATCH 4/4] remove the new dep --- .../HiveMetastoreLazyInitializationSuite.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala index 8f04e9013b64..277df548aefd 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala @@ -19,19 +19,17 @@ package org.apache.spark.sql.hive import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{AnalysisException, SparkSession} -import org.apache.spark.sql.test.SQLTestUtilsBase import org.apache.spark.util.Utils -class HiveMetastoreLazyInitializationSuite extends SparkFunSuite with SQLTestUtilsBase { +class HiveMetastoreLazyInitializationSuite extends SparkFunSuite { - override def spark: SparkSession = SparkSession.builder() + test("lazily initialize Hive client") { + val spark = SparkSession.builder() .appName("HiveMetastoreLazyInitializationSuite") .master("local[2]") .enableHiveSupport() .config("spark.hadoop.hive.metastore.uris", "thrift://127.0.0.1:11111") .getOrCreate() - - test("lazily initialize Hive client") { val originalLevel = org.apache.log4j.Logger.getRootLogger().getLevel try { // Avoid outputting a lot of expected warning logs @@ -45,9 +43,13 @@ class HiveMetastoreLazyInitializationSuite extends SparkFunSuite with SQLTestUti assert(spark.range(0, 1).count() === 1) // We should be able to use fs - withTempPath { dir => - spark.range(0, 1).write.parquet(dir.getAbsolutePath) - assert(spark.read.parquet(dir.getAbsolutePath).count() === 1) + val path = Utils.createTempDir() + path.delete() + try { + spark.range(0, 1).write.parquet(path.getAbsolutePath) + assert(spark.read.parquet(path.getAbsolutePath).count() === 1) + } finally { + Utils.deleteRecursively(path) } // Make sure that we are not using the local derby metastore.